You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/06/05 05:23:02 UTC
[14/67] [abbrv] kylin git commit: Revert "reformat code"
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
index c6b8f56..ce19500 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
@@ -52,8 +52,7 @@ public class HadoopShellExecutable extends AbstractExecutable {
Preconditions.checkNotNull(mapReduceJobClass);
Preconditions.checkNotNull(params);
try {
- final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil
- .forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();
+ final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();
final AbstractHadoopJob job = constructor.newInstance();
String[] args = params.trim().split("\\s+");
logger.info("parameters of the HadoopShellExecutable: {}", params);
@@ -69,8 +68,7 @@ public class HadoopShellExecutable extends AbstractExecutable {
result = 2;
}
log.append("result code:").append(result);
- return result == 0 ? new ExecuteResult(ExecuteResult.State.SUCCEED, log.toString())
- : new ExecuteResult(ExecuteResult.State.FAILED, log.toString());
+ return result == 0 ? new ExecuteResult(ExecuteResult.State.SUCCEED, log.toString()) : new ExecuteResult(ExecuteResult.State.FAILED, log.toString());
} catch (ReflectiveOperationException e) {
logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e);
return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java
index 8e2d634..d32928f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java
@@ -55,11 +55,9 @@ public class HadoopStatusChecker {
}
JobStepStatusEnum status = null;
try {
- final Pair<RMAppState, FinalApplicationStatus> result = new HadoopStatusGetter(yarnUrl, mrJobID)
- .get(useKerberosAuth);
+ final Pair<RMAppState, FinalApplicationStatus> result = new HadoopStatusGetter(yarnUrl, mrJobID).get(useKerberosAuth);
logger.debug("State of Hadoop job: " + mrJobID + ":" + result.getLeft() + "-" + result.getRight());
- output.append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S").format(new Date()) + " - State of Hadoop job: "
- + mrJobID + ":" + result.getLeft() + " - " + result.getRight() + "\n");
+ output.append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S").format(new Date()) + " - State of Hadoop job: " + mrJobID + ":" + result.getLeft() + " - " + result.getRight() + "\n");
switch (result.getRight()) {
case SUCCEEDED:
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
index f20e0a1..189e019 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
@@ -80,9 +80,7 @@ public class JobInfoConverter {
}
if (task instanceof MapReduceExecutable) {
result.setExecCmd(((MapReduceExecutable) task).getMapReduceParams());
- result.setExecWaitTime(
- AbstractExecutable.getExtraInfoAsLong(stepOutput, MapReduceExecutable.MAP_REDUCE_WAIT_TIME, 0L)
- / 1000);
+ result.setExecWaitTime(AbstractExecutable.getExtraInfoAsLong(stepOutput, MapReduceExecutable.MAP_REDUCE_WAIT_TIME, 0L) / 1000);
}
if (task instanceof HadoopShellExecutable) {
result.setExecCmd(((HadoopShellExecutable) task).getJobParams());
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index 02c8f45..07efb34 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -108,8 +108,7 @@ public class MapReduceExecutable extends AbstractExecutable {
job = new Cluster(conf).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID)));
logger.info("mr_job_id:" + extra.get(ExecutableConstants.MR_JOB_ID) + " resumed");
} else {
- final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil
- .forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();
+ final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();
final AbstractHadoopJob hadoopJob = constructor.newInstance();
hadoopJob.setConf(HadoopUtil.getCurrentConfiguration());
hadoopJob.setAsync(true); // so the ToolRunner.run() returns right away
@@ -155,8 +154,7 @@ public class MapReduceExecutable extends AbstractExecutable {
mgr.updateJobOutput(getId(), ExecutableState.ERROR, hadoopCmdOutput.getInfo(), "killed by admin");
return new ExecuteResult(ExecuteResult.State.FAILED, "killed by admin");
}
- if (status == JobStepStatusEnum.WAITING && (newStatus == JobStepStatusEnum.FINISHED
- || newStatus == JobStepStatusEnum.ERROR || newStatus == JobStepStatusEnum.RUNNING)) {
+ if (status == JobStepStatusEnum.WAITING && (newStatus == JobStepStatusEnum.FINISHED || newStatus == JobStepStatusEnum.ERROR || newStatus == JobStepStatusEnum.RUNNING)) {
final long waitTime = System.currentTimeMillis() - getStartTime();
setMapReduceWaitTime(waitTime);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java
index dfb0b8b..9ab42ea 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java
@@ -18,8 +18,6 @@
package org.apache.kylin.engine.mr.common;
-import java.io.Serializable;
-
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.SplittedBytes;
@@ -32,6 +30,8 @@ import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Serializable;
+
/**
*/
public class NDCuboidBuilder implements Serializable {
@@ -57,6 +57,7 @@ public class NDCuboidBuilder implements Serializable {
this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256);
}
+
public Pair<Integer, ByteArray> buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) {
RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(childCuboid);
@@ -66,7 +67,7 @@ public class NDCuboidBuilder implements Serializable {
long mask = Long.highestOneBit(parentCuboid.getId());
long parentCuboidId = parentCuboid.getId();
long childCuboidId = childCuboid.getId();
- long parentCuboidIdActualLength = (long) Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId());
+ long parentCuboidIdActualLength = (long)Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId());
int index = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId
for (int i = 0; i < parentCuboidIdActualLength; i++) {
if ((mask & parentCuboidId) > 0) {// if the this bit position equals
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index e6f976a..93e413b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -18,10 +18,6 @@
package org.apache.kylin.engine.mr.steps;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
@@ -38,6 +34,10 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
/**
*/
abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> {
@@ -66,12 +66,12 @@ abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);
cubeDesc = cube.getDescriptor();
cubeSegment = cube.getSegmentById(segmentID);
- CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(
- EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
+ CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc);
}
+
protected void outputKV(String[] flatRow, Context context) throws IOException, InterruptedException {
byte[] rowKey = baseCuboidBuilder.buildKey(flatRow);
outputKey.set(rowKey, 0, rowKey.length);
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index 6b5c8d1..98ebbb4 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -75,15 +75,13 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
Path colDir = new Path(factColumnsInputPath, col.getIdentity());
FileSystem fs = HadoopUtil.getWorkingFileSystem();
- Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir,
- col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
+ Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir, col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
if (dictFile == null) {
logger.info("Dict for '" + col.getName() + "' not pre-built.");
return null;
}
- try (SequenceFile.Reader reader = new SequenceFile.Reader(HadoopUtil.getCurrentConfiguration(),
- SequenceFile.Reader.file(dictFile))) {
+ try (SequenceFile.Reader reader = new SequenceFile.Reader(HadoopUtil.getCurrentConfiguration(), SequenceFile.Reader.file(dictFile))) {
NullWritable key = NullWritable.get();
BytesWritable value = new BytesWritable();
reader.next(key, value);
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
index acc224e..65c5869 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
@@ -23,17 +23,17 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-import javax.annotation.Nullable;
-
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
import org.apache.commons.lang.StringUtils;
+
+import com.google.common.collect.Lists;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.job.execution.ExecutableContext;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
+import javax.annotation.Nullable;
public class CubingExecutableUtil {
@@ -78,14 +78,13 @@ public class CubingExecutableUtil {
final CubeInstance cube = mgr.getCube(cubeName);
if (cube == null) {
- String cubeList = StringUtils
- .join(Iterables.transform(mgr.listAllCubes(), new Function<CubeInstance, String>() {
- @Nullable
- @Override
- public String apply(@Nullable CubeInstance input) {
- return input.getName();
- }
- }).iterator(), ",");
+ String cubeList = StringUtils.join(Iterables.transform(mgr.listAllCubes(), new Function<CubeInstance, String>() {
+ @Nullable
+ @Override
+ public String apply(@Nullable CubeInstance input) {
+ return input.getName();
+ }
+ }).iterator(), ",");
throw new IllegalStateException("target cube name: " + cubeName + " cube list: " + cubeList);
}
@@ -93,14 +92,13 @@ public class CubingExecutableUtil {
final CubeSegment newSegment = cube.getSegmentById(segmentId);
if (newSegment == null) {
- String segmentList = StringUtils
- .join(Iterables.transform(cube.getSegments(), new Function<CubeSegment, String>() {
- @Nullable
- @Override
- public String apply(@Nullable CubeSegment input) {
- return input.getUuid();
- }
- }).iterator(), ",");
+ String segmentList = StringUtils.join(Iterables.transform(cube.getSegments(), new Function<CubeSegment, String>() {
+ @Nullable
+ @Override
+ public String apply(@Nullable CubeSegment input) {
+ return input.getUuid();
+ }
+ }).iterator(), ",");
throw new IllegalStateException("target segment id: " + segmentId + " segment list: " + segmentList);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index fcea420..6a8ba4c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -93,8 +93,7 @@ public class CuboidJob extends AbstractHadoopJob {
CubeSegment segment = cube.getSegmentById(segmentID);
if (checkSkip(cubingJobId)) {
- logger.info(
- "Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + segmentID + "[" + segmentID + "]");
+ logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + segmentID + "[" + segmentID + "]");
return 0;
}
@@ -142,8 +141,7 @@ public class CuboidJob extends AbstractHadoopJob {
if ("FLAT_TABLE".equals(input)) {
// base cuboid case
- IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg)
- .getFlatTableInputFormat();
+ IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
flatTableInputFormat.configureJob(job);
} else {
// n-dimension cuboid case
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
index f7e8e4b..495be77 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import com.google.common.collect.Lists;
import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeManager;
@@ -35,8 +36,6 @@ import org.apache.kylin.metadata.model.MeasureDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-
/**
* @author George Song (ysong1)
*
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java
index 4a5cf07..a367bc6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java
@@ -26,8 +26,7 @@ import org.apache.kylin.engine.mr.KylinReducer;
/**
* @author yangli9
*/
-public class FactDistinctColumnsCombiner
- extends KylinReducer<SelfDefineSortableKey, Text, SelfDefineSortableKey, Text> {
+public class FactDistinctColumnsCombiner extends KylinReducer<SelfDefineSortableKey, Text, SelfDefineSortableKey, Text> {
@Override
protected void setup(Context context) throws IOException {
@@ -35,8 +34,7 @@ public class FactDistinctColumnsCombiner
}
@Override
- public void doReduce(SelfDefineSortableKey key, Iterable<Text> values, Context context)
- throws IOException, InterruptedException {
+ public void doReduce(SelfDefineSortableKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// for hll, each key only has one output, no need to do local combine;
// for normal col, values are empty text
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index fa6c62f..ee0989a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -94,9 +94,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
}
if (reducerCount > 255) {
- throw new IllegalArgumentException(
- "The max reducer number for FactDistinctColumnsJob is 255, but now it is " + reducerCount
- + ", decrease 'kylin.engine.mr.uhc-reducer-count'");
+ throw new IllegalArgumentException("The max reducer number for FactDistinctColumnsJob is 255, but now it is " + reducerCount + ", decrease 'kylin.engine.mr.uhc-reducer-count'");
}
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
@@ -154,14 +152,11 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
job.setNumReduceTasks(numberOfReducers);
//make each reducer output to respective dir
- MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class,
- NullWritable.class, Text.class);
- MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class,
- NullWritable.class, BytesWritable.class);
- MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class,
- LongWritable.class, BytesWritable.class);
- MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class,
- NullWritable.class, LongWritable.class);
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class);
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, BytesWritable.class);
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class, LongWritable.class, BytesWritable.class);
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, NullWritable.class, LongWritable.class);
+
FileOutputFormat.setOutputPath(job, output);
job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index 480ef95..713b7f7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -43,6 +43,8 @@ import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
+
+
/**
*/
public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> {
@@ -53,6 +55,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
BYTES
}
+
protected boolean collectStatistics = false;
protected CuboidScheduler cuboidScheduler = null;
protected int nRowKey;
@@ -84,8 +87,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
tmpbuf = ByteBuffer.allocate(4096);
collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
if (collectStatistics) {
- samplingPercentage = Integer
- .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
+ samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
cuboidScheduler = new CuboidScheduler(cubeDesc);
nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
@@ -101,6 +103,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE);
}
+
TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
if (partitionColRef != null) {
partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef);
@@ -126,9 +129,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
isUsePutRowKeyToHllNewAlgorithm = true;
rowHashCodesLong = new long[nRowKey];
hf = Hashing.murmur3_128();
- logger.info(
- "Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518",
- cubeDesc.getVersion());
+ logger.info("Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518", cubeDesc.getVersion());
}
}
@@ -159,7 +160,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(record);
- for (String[] row : rowCollection) {
+ for (String[] row: rowCollection) {
context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));
for (int i = 0; i < factDictCols.size(); i++) {
String fieldValue = row[dictionaryColumnIndex[i]];
@@ -172,8 +173,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
reducerIndex = columnIndexToReducerBeginId.get(i);
} else {
//for the uhc
- reducerIndex = columnIndexToReducerBeginId.get(i)
- + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount;
+ reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount;
}
tmpbuf.clear();
@@ -192,8 +192,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
// log a few rows for troubleshooting
if (rowCount < 10) {
- logger.info("Sample output: " + factDictCols.get(i) + " '" + fieldValue + "' => reducer "
- + reducerIndex);
+ logger.info("Sample output: " + factDictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex);
}
}
@@ -302,6 +301,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
}
}
+
private int countNewSize(int oldSize, int dataSize) {
int newSize = oldSize * 2;
while (newSize < dataSize) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index 2e55a52..458af69 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -43,8 +43,7 @@ import org.apache.kylin.metadata.model.TblColRef;
/**
*/
-abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN>
- extends KylinMapper<KEYIN, VALUEIN, SelfDefineSortableKey, Text> {
+abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, SelfDefineSortableKey, Text> {
protected String cubeName;
protected CubeInstance cube;
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index f5f03e2..7f01c3a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import com.google.common.base.Preconditions;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
@@ -51,7 +52,6 @@ import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -112,8 +112,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
isStatistics = true;
baseCuboidRowCountInMappers = Lists.newArrayList();
cuboidHLLMap = Maps.newHashMap();
- samplingPercentage = Integer
- .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
+ samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
logger.info("Reducer " + taskId + " handling stats");
} else if (collectStatistics && (taskId == numberOfTasks - 2)) {
// partition col
@@ -134,7 +133,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only works with default dictionary builder
buildDictInReducer = false;
}
- if (config.getUHCReducerCount() > 1) {
+ if(config.getUHCReducerCount() > 1) {
int[] uhcIndex = CubeManager.getInstance(config).getUHCIndex(cubeDesc);
int colIndex = reducerIdToColumnIndex.get(taskId);
if (uhcIndex[colIndex] == 1)
@@ -163,8 +162,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
}
@Override
- public void doReduce(SelfDefineSortableKey skey, Iterable<Text> values, Context context)
- throws IOException, InterruptedException {
+ public void doReduce(SelfDefineSortableKey skey, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Text key = skey.getText();
if (isStatistics) {
// for hll
@@ -245,12 +243,9 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
// output written to baseDir/colName/colName.pci-r-00000 (etc)
String partitionFileName = col.getIdentity() + "/" + col.getName() + PARTITION_COL_INFO_FILE_POSTFIX;
- mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMinValue),
- partitionFileName);
- mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMaxValue),
- partitionFileName);
- logger.info("write partition info for col : " + col.getName() + " minValue:" + timeMinValue + " maxValue:"
- + timeMaxValue);
+ mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMinValue), partitionFileName);
+ mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMaxValue), partitionFileName);
+ logger.info("write partition info for col : " + col.getName() + " minValue:" + timeMinValue + " maxValue:" + timeMaxValue);
}
}
@@ -258,13 +253,11 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
// output written to baseDir/colName/colName.rldict-r-00000 (etc)
String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX;
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream outputStream = new DataOutputStream(baos);) {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream outputStream = new DataOutputStream(baos);) {
outputStream.writeUTF(dict.getClass().getName());
dict.write(outputStream);
- mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(), new BytesWritable(baos.toByteArray()),
- dictFileName);
+ mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(), new BytesWritable(baos.toByteArray()), dictFileName);
}
}
@@ -280,23 +273,19 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
grandTotal += hll.getCountEstimate();
}
double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
- mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-1),
- new BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName);
+ mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-1), new BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName);
// mapper number at key -2
- mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-2),
- new BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), statisticsFileName);
+ mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-2), new BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), statisticsFileName);
// sampling percentage at key 0
- mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(0L),
- new BytesWritable(Bytes.toBytes(samplingPercentage)), statisticsFileName);
+ mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage)), statisticsFileName);
for (long i : allCuboids) {
valueBuf.clear();
cuboidHLLMap.get(i).writeRegisters(valueBuf);
valueBuf.flip();
- mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(i),
- new BytesWritable(valueBuf.array(), valueBuf.limit()), statisticsFileName);
+ mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit()), statisticsFileName);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
index d55b775..a04fb43 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
@@ -40,7 +40,7 @@ public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, O
@Override
public void doMap(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(value);
- for (String[] row : rowCollection) {
+ for (String[] row: rowCollection) {
try {
outputKV(row, context);
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index 7d8320a..73a2eb9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -111,8 +111,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
job.setOutputValueClass(Text.class);
// set input
- IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment)
- .getFlatTableInputFormat();
+ IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat();
flatTableInputFormat.configureJob(job);
// set output
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 65d1525..eee189c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -101,8 +101,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
cubeBuilder.setConcurrentThreads(taskCount);
ExecutorService executorService = Executors.newSingleThreadExecutor();
- future = executorService.submit(
- cubeBuilder.buildAsRunnable(queue, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
+ future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
}
@@ -120,7 +119,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
// put each row to the queue
Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(record);
- for (String[] row : rowCollection) {
+ for(String[] row: rowCollection) {
List<String> rowAsList = Arrays.asList(row);
while (!future.isDone()) {
if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) {
@@ -143,8 +142,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
try {
future.get();
} catch (Exception e) {
- throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(),
- e);
+ throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e);
}
queue.clear();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
index 65ba841..244889f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -74,8 +74,7 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra
}
@Override
- public void doReduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context)
- throws IOException, InterruptedException {
+ public void doReduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context) throws IOException, InterruptedException {
aggs.reset();
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
index c1a55da..d183f90 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
@@ -31,8 +31,7 @@ public class MapContextGTRecordWriter extends KVGTRecordWriter {
protected MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext;
- public MapContextGTRecordWriter(MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext,
- CubeDesc cubeDesc, CubeSegment cubeSegment) {
+ public MapContextGTRecordWriter(MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
super(cubeDesc, cubeSegment);
this.mapContext = mapContext;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 7c50f23..a603fc8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -110,8 +110,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
// decide which source segment
FileSplit fileSplit = (FileSplit) context.getInputSplit();
- IMROutput2.IMRMergeOutputFormat outputFormat = MRUtil.getBatchMergeOutputSide2(mergedCubeSegment)
- .getOuputFormat();
+ IMROutput2.IMRMergeOutputFormat outputFormat = MRUtil.getBatchMergeOutputSide2(mergedCubeSegment).getOuputFormat();
sourceCubeSegment = outputFormat.findSourceSegment(fileSplit, cube);
rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
@@ -185,8 +184,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
}
- int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[useSplit].value, 0,
- splittedByteses[useSplit].length);
+ int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[useSplit].value, 0, splittedByteses[useSplit].length);
int idInMergedDict;
//int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
@@ -207,8 +205,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
}
- System.arraycopy(splittedByteses[useSplit].value, 0, newKeyBodyBuf, bufOffset,
- splittedByteses[useSplit].length);
+ System.arraycopy(splittedByteses[useSplit].value, 0, newKeyBodyBuf, bufOffset, splittedByteses[useSplit].length);
bufOffset += splittedByteses[useSplit].length;
}
}
@@ -242,14 +239,13 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
Boolean ret = dimensionsNeedDict.get(col);
if (ret != null)
return ret;
-
+
ret = cubeDesc.getRowkey().isUseDictionary(col);
if (ret) {
- TableRef srcTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), col)
- .getTableRef();
+ TableRef srcTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), col).getTableRef();
ret = cubeDesc.getModel().isFactTable(srcTable);
}
-
+
dimensionsNeedDict.put(col, ret);
return ret;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
index f6658c8..4ca132c 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
@@ -103,8 +103,7 @@ public class MergeDictionaryStep extends AbstractExecutable {
* @param newSeg
* @throws IOException
*/
- private void makeDictForNewSegment(KylinConfig conf, CubeInstance cube, CubeSegment newSeg,
- List<CubeSegment> mergingSegments) throws IOException {
+ private void makeDictForNewSegment(KylinConfig conf, CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) throws IOException {
HashSet<TblColRef> colsNeedMeringDict = new HashSet<TblColRef>();
HashSet<TblColRef> colsNeedCopyDict = new HashSet<TblColRef>();
DictionaryManager dictMgr = DictionaryManager.getInstance(conf);
@@ -143,8 +142,7 @@ public class MergeDictionaryStep extends AbstractExecutable {
}
}
- private DictionaryInfo mergeDictionaries(DictionaryManager dictMgr, CubeSegment cubeSeg, List<DictionaryInfo> dicts,
- TblColRef col) throws IOException {
+ private DictionaryInfo mergeDictionaries(DictionaryManager dictMgr, CubeSegment cubeSeg, List<DictionaryInfo> dicts, TblColRef col) throws IOException {
DictionaryInfo dictInfo = dictMgr.mergeDictionary(dicts);
if (dictInfo != null)
cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 17c4d03..04d8231 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -76,8 +76,7 @@ public class MergeStatisticsStep extends AbstractExecutable {
int averageSamplingPercentage = 0;
for (String segmentId : CubingExecutableUtil.getMergingSegmentIds(this.getParams())) {
- String fileKey = CubeSegment
- .getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId);
+ String fileKey = CubeSegment.getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId);
InputStream is = rs.getResource(fileKey).inputStream;
File tempFile = null;
FileOutputStream tempFileStream = null;
@@ -121,13 +120,9 @@ public class MergeStatisticsStep extends AbstractExecutable {
tempFile.delete();
}
}
- averageSamplingPercentage = averageSamplingPercentage
- / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
- CubeStatsWriter.writeCuboidStatistics(conf,
- new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap,
- averageSamplingPercentage);
- Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()),
- BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+ averageSamplingPercentage = averageSamplingPercentage / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
+ CubeStatsWriter.writeCuboidStatistics(conf, new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap, averageSamplingPercentage);
+ Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
FSDataInputStream is = fs.open(statisticsFilePath);
try {
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
index 9b41a8e..eee2c00 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
@@ -47,8 +47,7 @@ import com.google.common.collect.Sets;
public class MetadataCleanupJob extends AbstractHadoopJob {
@SuppressWarnings("static-access")
- private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false)
- .withDescription("Delete the unused metadata").create("delete");
+ private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused metadata").create("delete");
protected static final Logger logger = LoggerFactory.getLogger(MetadataCleanupJob.class);
@@ -101,8 +100,7 @@ public class MetadataCleanupJob extends AbstractHadoopJob {
List<String> toDeleteResource = Lists.newArrayList();
// two level resources, snapshot tables and cube statistics
- for (String resourceRoot : new String[] { ResourceStore.SNAPSHOT_RESOURCE_ROOT,
- ResourceStore.CUBE_STATISTICS_ROOT }) {
+ for (String resourceRoot : new String[] { ResourceStore.SNAPSHOT_RESOURCE_ROOT, ResourceStore.CUBE_STATISTICS_ROOT }) {
NavigableSet<String> snapshotTables = getStore().listResources(resourceRoot);
if (snapshotTables != null) {
@@ -151,9 +149,7 @@ public class MetadataCleanupJob extends AbstractHadoopJob {
for (ExecutablePO executable : allExecutable) {
long lastModified = executable.getLastModified();
ExecutableOutputPO output = executableDao.getJobOutput(executable.getUuid());
- if (System.currentTimeMillis() - lastModified > TIME_THREADSHOLD_FOR_JOB
- && (ExecutableState.SUCCEED.toString().equals(output.getStatus())
- || ExecutableState.DISCARDED.toString().equals(output.getStatus()))) {
+ if (System.currentTimeMillis() - lastModified > TIME_THREADSHOLD_FOR_JOB && (ExecutableState.SUCCEED.toString().equals(output.getStatus()) || ExecutableState.DISCARDED.toString().equals(output.getStatus()))) {
toDeleteResource.add(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + executable.getUuid());
toDeleteResource.add(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + executable.getUuid());
@@ -164,8 +160,7 @@ public class MetadataCleanupJob extends AbstractHadoopJob {
}
if (toDeleteResource.size() > 0) {
- logger.info(
- "The following resources have no reference or is too old, will be cleaned from metadata store: \n");
+ logger.info("The following resources have no reference or is too old, will be cleaned from metadata store: \n");
for (String s : toDeleteResource) {
logger.info(s);
@@ -180,8 +175,7 @@ public class MetadataCleanupJob extends AbstractHadoopJob {
}
public static void main(String[] args) throws Exception {
- logger.warn(
- "org.apache.kylin.engine.mr.steps.MetadataCleanupJob is deprecated, use org.apache.kylin.tool.MetadataCleanupJob instead");
+ logger.warn("org.apache.kylin.engine.mr.steps.MetadataCleanupJob is deprecated, use org.apache.kylin.tool.MetadataCleanupJob instead");
int exitCode = ToolRunner.run(new MetadataCleanupJob(), args);
System.exit(exitCode);
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index 8bf6d4b..b924edc 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -18,9 +18,6 @@
package org.apache.kylin.engine.mr.steps;
-import java.io.IOException;
-import java.util.Collection;
-
import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ByteArray;
@@ -39,6 +36,9 @@ import org.apache.kylin.engine.mr.common.NDCuboidBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Collection;
+
/**
* @author George Song (ysong1)
*
@@ -79,6 +79,8 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256);
}
+
+
@Override
public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
long cuboidId = rowKeySplitter.split(key.getBytes());
@@ -103,8 +105,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
for (Long child : myChildren) {
Cuboid childCuboid = Cuboid.findById(cubeDesc, child);
- Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid,
- rowKeySplitter.getSplitBuffers());
+ Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());
outputKey.set(result.getSecond().array(), 0, result.getFirst());
context.write(outputKey, value);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java
index 8acd499..5c0555a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java
@@ -34,15 +34,13 @@ public class ReducerNumSizing {
private static final Logger logger = LoggerFactory.getLogger(ReducerNumSizing.class);
- public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, double totalMapInputMB, int level)
- throws ClassNotFoundException, IOException, InterruptedException, JobException {
+ public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, double totalMapInputMB, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException {
CubeDesc cubeDesc = cubeSegment.getCubeDesc();
KylinConfig kylinConfig = cubeDesc.getConfig();
double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
- logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level "
- + level);
+ logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level " + level);
CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, kylinConfig);
@@ -52,8 +50,7 @@ public class ReducerNumSizing {
//merge case
double estimatedSize = cubeStatsReader.estimateCubeSize();
adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize;
- logger.debug("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize,
- totalMapInputMB, adjustedCurrentLayerSizeEst);
+ logger.debug("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize, totalMapInputMB, adjustedCurrentLayerSizeEst);
} else if (level == 0) {
//base cuboid case TODO: the estimation could be very WRONG because it has no correction
adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
@@ -62,9 +59,7 @@ public class ReducerNumSizing {
parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst;
- logger.debug(
- "totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}",
- totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
+ logger.debug("totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}", totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
}
// number of reduce tasks
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerJob.java
index 89534fe..3419949 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerJob.java
@@ -39,8 +39,7 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
public class RowKeyDistributionCheckerJob extends AbstractHadoopJob {
@SuppressWarnings("static-access")
- protected static final Option ROW_KEY_STATS_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true)
- .withDescription("rowKeyStatsFilePath").create("rowKeyStatsFilePath");
+ protected static final Option ROW_KEY_STATS_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("rowKeyStatsFilePath").create("rowKeyStatsFilePath");
@Override
public int run(String[] args) throws Exception {
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
index 0af1b85..eab57d1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
@@ -68,7 +68,7 @@ public class RowKeyDistributionCheckerMapper extends KylinMapper<Text, Text, Tex
for (Text t : keyList) {
if (key.compareTo(t) < 0) {
Long v = resultMap.get(t);
- long length = (long) key.getLength() + value.getLength();
+ long length = (long)key.getLength() + value.getLength();
v += length;
resultMap.put(t, v);
break;
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java
index 1aa406f..d203e8c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java
@@ -38,8 +38,7 @@ public class RowKeyDistributionCheckerReducer extends KylinReducer<Text, LongWri
}
@Override
- public void doReduce(Text key, Iterable<LongWritable> values, Context context)
- throws IOException, InterruptedException {
+ public void doReduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long length = 0;
for (LongWritable v : values) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index 859cd2e..28f99fb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -54,17 +54,14 @@ public class SaveStatisticsStep extends AbstractExecutable {
@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- CubeSegment newSegment = CubingExecutableUtil.findSegment(context,
- CubingExecutableUtil.getCubeName(this.getParams()),
- CubingExecutableUtil.getSegmentId(this.getParams()));
+ CubeSegment newSegment = CubingExecutableUtil.findSegment(context, CubingExecutableUtil.getCubeName(this.getParams()), CubingExecutableUtil.getSegmentId(this.getParams()));
KylinConfig kylinConf = newSegment.getConfig();
ResourceStore rs = ResourceStore.getStore(kylinConf);
try {
FileSystem fs = HadoopUtil.getWorkingFileSystem();
Path statisticsDir = new Path(CubingExecutableUtil.getStatisticsPath(this.getParams()));
- Path statisticsFilePath = HadoopUtil.getFilterOnlyPath(fs, statisticsDir,
- BatchConstants.CFG_OUTPUT_STATISTICS);
+ Path statisticsFilePath = HadoopUtil.getFilterOnlyPath(fs, statisticsDir, BatchConstants.CFG_OUTPUT_STATISTICS);
if (statisticsFilePath == null) {
throw new IOException("fail to find the statistics file in base dir: " + statisticsDir);
}
@@ -114,8 +111,7 @@ public class SaveStatisticsStep extends AbstractExecutable {
double mapperOverlapRatio = cubeStats.getMapperOverlapRatioOfFirstBuild();
double overlapThreshold = kylinConf.getCubeAlgorithmAutoThreshold();
logger.info("mapperNumber for " + seg + " is " + mapperNumber + " and threshold is " + mapperNumLimit);
- logger.info("mapperOverlapRatio for " + seg + " is " + mapperOverlapRatio + " and threshold is "
- + overlapThreshold);
+ logger.info("mapperOverlapRatio for " + seg + " is " + mapperOverlapRatio + " and threshold is " + overlapThreshold);
// in-mem cubing is good when
// 1) the cluster has enough mapper slots to run in parallel
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKey.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKey.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKey.java
index bb4152e..c75abea 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKey.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKey.java
@@ -28,7 +28,9 @@ import org.apache.kylin.metadata.datatype.DataType;
public class SelfDefineSortableKey implements WritableComparable<SelfDefineSortableKey> {
public enum TypeFlag {
- NONE_NUMERIC_TYPE, INTEGER_FAMILY_TYPE, DOUBLE_FAMILY_TYPE
+ NONE_NUMERIC_TYPE,
+ INTEGER_FAMILY_TYPE,
+ DOUBLE_FAMILY_TYPE
}
private byte typeId; //non-numeric(0000 0000) int(0000 0001) other numberic(0000 0010)
@@ -59,6 +61,7 @@ public class SelfDefineSortableKey implements WritableComparable<SelfDefineSorta
}
}
+
public void init(Text key, DataType type) {
init(key, getTypeIdByDatatype(type));
}
@@ -110,6 +113,7 @@ public class SelfDefineSortableKey implements WritableComparable<SelfDefineSorta
return (typeId == TypeFlag.INTEGER_FAMILY_TYPE.ordinal());
}
+
public byte getTypeIdByDatatype(DataType type) {
if (!type.isNumberFamily()) {
return (byte) TypeFlag.NONE_NUMERIC_TYPE.ordinal();
@@ -125,3 +129,5 @@ public class SelfDefineSortableKey implements WritableComparable<SelfDefineSorta
}
}
+
+
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index 32eaebd..2efd718 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -88,8 +88,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
Path colDir = new Path(factColumnsInputPath, partitionCol.getIdentity());
FileSystem fs = HadoopUtil.getWorkingFileSystem();
- Path outputFile = HadoopUtil.getFilterOnlyPath(fs, colDir,
- partitionCol.getName() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX);
+ Path outputFile = HadoopUtil.getFilterOnlyPath(fs, colDir, partitionCol.getName() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX);
if (outputFile == null) {
throw new IOException("fail to find the partition file in base dir: " + colDir);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
index 8447e44..add5c42 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
@@ -51,8 +51,7 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
CubeSegment mergedSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
if (mergedSegment == null) {
- return new ExecuteResult(ExecuteResult.State.FAILED,
- "there is no segment with id:" + CubingExecutableUtil.getSegmentId(this.getParams()));
+ return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + CubingExecutableUtil.getSegmentId(this.getParams()));
}
CubingJob cubingJob = (CubingJob) getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java
index 1ad6687..39c5bac 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java
@@ -297,7 +297,7 @@ public class SortedColumnReaderTest {
}
return result;
}
-
+
private String qualify(String path) {
String absolutePath = new File(path).getAbsolutePath();
if (absolutePath.startsWith("/"))
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/TableReaderTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/TableReaderTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/TableReaderTest.java
index e1adbb3..4c43dbc 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/TableReaderTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/TableReaderTest.java
@@ -35,11 +35,9 @@ public class TableReaderTest {
@Test
public void testBasicReader() throws IOException {
File f = new File("src/test/resources/dict/DW_SITES");
- DFSFileTableReader reader = new DFSFileTableReader("file://" + f.getAbsolutePath(), DFSFileTable.DELIM_AUTO,
- 10);
+ DFSFileTableReader reader = new DFSFileTableReader("file://" + f.getAbsolutePath(), DFSFileTable.DELIM_AUTO, 10);
while (reader.next()) {
- assertEquals("[-1, Korea Auction.co.kr, S, 48, 0, 111, 2009-02-11, , DW_OFFPLAT, ]",
- Arrays.toString(reader.getRow()));
+ assertEquals("[-1, Korea Auction.co.kr, S, 48, 0, 111, 2009-02-11, , DW_OFFPLAT, ]", Arrays.toString(reader.getRow()));
break;
}
reader.close();
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
index 32e80fc..7616df2 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
@@ -77,8 +77,7 @@ public class CubeReducerTest extends LocalFileMetadataTestCase {
reduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, "test_kylin_cube_with_slr_ready");
- CubeDesc cubeDesc = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready")
- .getDescriptor();
+ CubeDesc cubeDesc = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready").getDescriptor();
BufferedMeasureCodec codec = new BufferedMeasureCodec(cubeDesc.getMeasures());
Text key1 = new Text("72010ustech");
@@ -102,12 +101,9 @@ public class CubeReducerTest extends LocalFileMetadataTestCase {
List<Pair<Text, Text>> result = reduceDriver.run();
- Pair<Text, Text> p1 = new Pair<Text, Text>(new Text("72010ustech"),
- newValueText(codec, "45.43", "10", "20.34", 3, 600));
- Pair<Text, Text> p2 = new Pair<Text, Text>(new Text("1tech"),
- newValueText(codec, "35.43", "15.09", "20.34", 2, 1500));
- Pair<Text, Text> p3 = new Pair<Text, Text>(new Text("0"),
- newValueText(codec, "146.52", "146.52", "146.52", 0, 0));
+ Pair<Text, Text> p1 = new Pair<Text, Text>(new Text("72010ustech"), newValueText(codec, "45.43", "10", "20.34", 3, 600));
+ Pair<Text, Text> p2 = new Pair<Text, Text>(new Text("1tech"), newValueText(codec, "35.43", "15.09", "20.34", 2, 1500));
+ Pair<Text, Text> p3 = new Pair<Text, Text>(new Text("0"), newValueText(codec, "146.52", "146.52", "146.52", 0, 0));
assertEquals(3, result.size());
@@ -121,8 +117,7 @@ public class CubeReducerTest extends LocalFileMetadataTestCase {
reduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, "test_kylin_cube_with_slr_ready");
reduceDriver.getConfiguration().setInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, 1);
- CubeDesc cubeDesc = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready")
- .getDescriptor();
+ CubeDesc cubeDesc = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready").getDescriptor();
MeasureDesc measureDesc = cubeDesc.getMeasures().get(0);
FunctionDesc functionDesc = measureDesc.getFunction();
Field field = FunctionDesc.class.getDeclaredField("measureType");
@@ -153,10 +148,8 @@ public class CubeReducerTest extends LocalFileMetadataTestCase {
List<Pair<Text, Text>> result = reduceDriver.run();
- Pair<Text, Text> p1 = new Pair<Text, Text>(new Text("72010ustech"),
- newValueText(codec, "0", "10", "20.34", 3, 600));
- Pair<Text, Text> p2 = new Pair<Text, Text>(new Text("1tech"),
- newValueText(codec, "0", "15.09", "20.34", 2, 1500));
+ Pair<Text, Text> p1 = new Pair<Text, Text>(new Text("72010ustech"), newValueText(codec, "0", "10", "20.34", 3, 600));
+ Pair<Text, Text> p2 = new Pair<Text, Text>(new Text("1tech"), newValueText(codec, "0", "15.09", "20.34", 2, 1500));
Pair<Text, Text> p3 = new Pair<Text, Text>(new Text("0"), newValueText(codec, "0", "146.52", "146.52", 0, 0));
assertEquals(3, result.size());
@@ -166,10 +159,8 @@ public class CubeReducerTest extends LocalFileMetadataTestCase {
assertTrue(result.contains(p3));
}
- private Text newValueText(BufferedMeasureCodec codec, String sum, String min, String max, int count,
- int item_count) {
- Object[] values = new Object[] { new BigDecimal(sum), new BigDecimal(min), new BigDecimal(max), new Long(count),
- new Long(item_count) };
+ private Text newValueText(BufferedMeasureCodec codec, String sum, String min, String max, int count, int item_count) {
+ Object[] values = new Object[] { new BigDecimal(sum), new BigDecimal(min), new BigDecimal(max), new Long(count), new Long(item_count) };
ByteBuffer buf = codec.encode(values);
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java
index 63e09ac..2e2ebf9 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java
@@ -80,8 +80,7 @@ public class MergeCuboidJobTest extends LocalFileMetadataTestCase {
// CubeManager cubeManager =
// CubeManager.getInstanceFromEnv(getTestConfig());
- String[] args = { "-input", baseFolder.getAbsolutePath() + "," + eightFoler.getAbsolutePath(), "-cubename",
- cubeName, "-segmentname", "20130331080000_20131212080000", "-output", output, "-jobname", jobname };
+ String[] args = { "-input", baseFolder.getAbsolutePath() + "," + eightFoler.getAbsolutePath(), "-cubename", cubeName, "-segmentname", "20130331080000_20131212080000", "-output", output, "-jobname", jobname };
assertEquals("Job failed", 0, ToolRunner.run(conf, new MergeCuboidJob(), args));
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
index f73c645..04af4fe 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
@@ -52,7 +52,7 @@ import org.slf4j.LoggerFactory;
@SuppressWarnings("rawtypes")
public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
-
+
private static final Logger logger = LoggerFactory.getLogger(MergeCuboidMapperTest.class);
MapDriver<Text, Text, Text, Text> mapDriver;
@@ -75,8 +75,7 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
List<String> values = new ArrayList<>();
values.add("eee");
values.add("fff");
- Dictionary<String> dict = DictionaryGenerator.buildDictionary(DataType.getType(newDictInfo.getDataType()),
- new IterableDictionaryValueEnumerator(values));
+ Dictionary<String> dict = DictionaryGenerator.buildDictionary(DataType.getType(newDictInfo.getDataType()), new IterableDictionaryValueEnumerator(values));
dictionaryManager.trySaveNewDict(dict, newDictInfo);
dict.dump(System.out);
@@ -128,8 +127,7 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
values.add("ccc");
else
values.add("bbb");
- Dictionary<String> dict = DictionaryGenerator.buildDictionary(DataType.getType(newDictInfo.getDataType()),
- new IterableDictionaryValueEnumerator(values));
+ Dictionary<String> dict = DictionaryGenerator.buildDictionary(DataType.getType(newDictInfo.getDataType()), new IterableDictionaryValueEnumerator(values));
dictionaryManager.trySaveNewDict(dict, newDictInfo);
dict.dump(System.out);
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java
index e15d463..989ed72 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java
@@ -66,8 +66,7 @@ public class NDCuboidJobTest extends LocalFileMetadataTestCase {
FileUtil.fullyDelete(new File(output));
- String[] args = { "-input", input, "-cubename", cubeName, "-segmentname", segmentName, "-output", output,
- "-jobname", jobname, "-level", level };
+ String[] args = { "-input", input, "-cubename", cubeName, "-segmentname", segmentName, "-output", output, "-jobname", jobname, "-level", level };
assertEquals("Job failed", 0, ToolRunner.run(conf, new NDCuboidJob(), args));
}
@@ -82,8 +81,7 @@ public class NDCuboidJobTest extends LocalFileMetadataTestCase {
FileUtil.fullyDelete(new File(output));
- String[] args = { "-input", input, "-cubename", cubeName, "-segmentname", segmentName, "-output", output,
- "-jobname", jobname, "-level", level };
+ String[] args = { "-input", input, "-cubename", cubeName, "-segmentname", segmentName, "-output", output, "-jobname", jobname, "-level", level };
assertEquals("Job failed", 0, ToolRunner.run(conf, new NDCuboidJob(), args));
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
index 3ee49f2..c0ce1a4 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
@@ -76,10 +76,8 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase {
mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
- byte[] key = { 0, 0, 0, 0, 0, 0, 0, 0, 1, -1, 0, -104, -106, -128, 11, 54, -105, 55, 9, 9, 9, 9, 9, 9, 9, 9, 9,
- 9, 9, 9, 9, 9, 9, 9, 9, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
- byte[] value = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16,
- 56, 92, 114, -80, 118, 1, 1 };
+ byte[] key = { 0, 0, 0, 0, 0, 0, 0, 0, 1, -1, 0, -104, -106, -128, 11, 54, -105, 55, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
+ byte[] value = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 1, 1 };
Pair<Text, Text> input1 = new Pair<Text, Text>(new Text(key), new Text(value));
mapReduceDriver.addInput(input1);
@@ -88,10 +86,8 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase {
assertEquals(4, result.size());
- byte[] resultKey = { 0, 0, 0, 0, 0, 0, 0, 0, 1, 127, 0, -104, -106, -128, 55, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9,
- 9, 9, 9, 9, 9, 9, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
- byte[] resultValue = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23,
- -16, 56, 92, 114, -80, 118, 1, 1 };
+ byte[] resultKey = { 0, 0, 0, 0, 0, 0, 0, 0, 1, 127, 0, -104, -106, -128, 55, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
+ byte[] resultValue = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 1, 1 };
Pair<Text, Text> output1 = new Pair<Text, Text>(new Text(resultKey), new Text(resultValue));
//As we will truncate decimal(KYLIN-766), value will no longer equals to resultValue
@@ -110,8 +106,7 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase {
System.out.println(Bytes.toLong(new byte[] { 0, 0, 0, 0, 0, 0, 1, -1 }));
for (int i = 0; i < result.size(); i++) {
byte[] bytes = new byte[result.get(i).getFirst().getLength()];
- System.arraycopy(result.get(i).getFirst().getBytes(), RowConstants.ROWKEY_SHARDID_LEN, bytes, 0,
- result.get(i).getFirst().getLength() - RowConstants.ROWKEY_SHARDID_LEN);
+ System.arraycopy(result.get(i).getFirst().getBytes(), RowConstants.ROWKEY_SHARDID_LEN, bytes, 0, result.get(i).getFirst().getLength() - RowConstants.ROWKEY_SHARDID_LEN);
System.out.println(Bytes.toLong(bytes));
keySet[i] = Bytes.toLong(bytes);
}