You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by wo...@apache.org on 2018/01/18 09:38:16 UTC
kylin git commit: KYLIN-3177, fix tsRange null issue
Repository: kylin
Updated Branches:
refs/heads/tmp [created] 15e818825
KYLIN-3177, fix tsRange null issue
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/15e81882
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/15e81882
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/15e81882
Branch: refs/heads/tmp
Commit: 15e8188259b5c67ed3ad19281cbd4cb49eec86ba
Parents: c3d1745
Author: Cheng Wang <ch...@kyligence.io>
Authored: Thu Jan 18 17:33:35 2018 +0800
Committer: Cheng Wang <ch...@kyligence.io>
Committed: Thu Jan 18 17:34:23 2018 +0800
----------------------------------------------------------------------
.../mr/steps/UpdateCubeInfoAfterMergeStep.java | 14 +++++++++++---
.../apache/kylin/provision/BuildCubeWithStream.java | 15 +++++++++++----
2 files changed, 22 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/15e81882/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
index 3185bec..018abab 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
@@ -31,11 +31,10 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.SegmentRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- */
public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
private static final Logger logger = LoggerFactory.getLogger(UpdateCubeInfoAfterMergeStep.class);
@@ -66,10 +65,15 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
}
long sourceCount = 0L;
long sourceSize = 0L;
+
+ boolean isOffsetCube = mergedSegment.isOffsetCube();
+ Long tsStartMin = Long.MAX_VALUE, tsEndMax = 0L;
for (String id : mergingSegmentIds) {
CubeSegment segment = cube.getSegmentById(id);
sourceCount += segment.getInputRecords();
sourceSize += segment.getInputRecordsSize();
+ tsStartMin = Math.min(tsStartMin, segment.getTSRange().start.v);
+ tsEndMax = Math.max(tsEndMax, segment.getTSRange().end.v);
}
// update segment info
@@ -79,6 +83,11 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
mergedSegment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams()));
mergedSegment.setLastBuildTime(System.currentTimeMillis());
+ if (isOffsetCube == true) {
+ SegmentRange.TSRange tsRange = new SegmentRange.TSRange(tsStartMin, tsEndMax);
+ mergedSegment.setTSRange(tsRange);
+ }
+
try {
cubeManager.promoteNewlyBuiltSegments(cube, mergedSegment);
return new ExecuteResult(ExecuteResult.State.SUCCEED);
@@ -87,5 +96,4 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
return ExecuteResult.createError(e);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/15e81882/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index f9277bc..f7b8275 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -232,7 +232,8 @@ public class BuildCubeWithStream {
for (int i = 0; i < futures.size(); i++) {
ExecutableState result = futures.get(i).get(20, TimeUnit.MINUTES);
logger.info("Checking building task " + i + " whose state is " + result);
- Assert.assertTrue(result == null || result == ExecutableState.SUCCEED || result == ExecutableState.DISCARDED);
+ Assert.assertTrue(
+ result == null || result == ExecutableState.SUCCEED || result == ExecutableState.DISCARDED);
if (result == ExecutableState.SUCCEED)
succeedBuild++;
}
@@ -250,6 +251,9 @@ public class BuildCubeWithStream {
segments = cubeManager.getCube(cubeName).getSegments();
Assert.assertTrue(segments.size() == 1);
+ SegmentRange.TSRange tsRange = segments.get(0).getTSRange();
+ Assert.assertTrue(tsRange.duration() > 0);
+
CubeSegment toRefreshSeg = segments.get(0);
refreshSegment(cubeName, toRefreshSeg.getSegRange());
@@ -279,7 +283,8 @@ public class BuildCubeWithStream {
protected ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
CubeInstance cubeInstance = cubeManager.getCube(cubeName);
ISource source = SourceFactory.getSource(cubeInstance);
- SourcePartition partition = source.enrichSourcePartitionBeforeBuild(cubeInstance, new SourcePartition(null, new SegmentRange(startOffset, endOffset), null, null));
+ SourcePartition partition = source.enrichSourcePartitionBeforeBuild(cubeInstance,
+ new SourcePartition(null, new SegmentRange(startOffset, endOffset), null, null));
CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), partition);
DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
jobService.addJob(job);
@@ -298,7 +303,8 @@ public class BuildCubeWithStream {
ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA);
if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
- throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.4.0.0-169");
+ throw new RuntimeException(
+ "No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.4.0.0-169");
}
HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
}
@@ -326,7 +332,8 @@ public class BuildCubeWithStream {
protected void waitForJob(String jobId) {
while (true) {
AbstractExecutable job = jobService.getJob(jobId);
- if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR || job.getStatus() == ExecutableState.DISCARDED) {
+ if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR
+ || job.getStatus() == ExecutableState.DISCARDED) {
break;
} else {
try {