You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2018/01/21 12:02:19 UTC

[09/13] kylin git commit: KYLIN-3177, fix tsRange null issue

KYLIN-3177, fix tsRange null issue


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/27ccf441
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/27ccf441
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/27ccf441

Branch: refs/heads/KYLIN-2881-review
Commit: 27ccf441c64ca28a0dfe86ced6ea8b56b88b02df
Parents: c3d1745
Author: Cheng Wang <ch...@kyligence.io>
Authored: Thu Jan 18 17:33:35 2018 +0800
Committer: Cheng Wang <ch...@kyligence.io>
Committed: Thu Jan 18 17:40:02 2018 +0800

----------------------------------------------------------------------
 .../mr/steps/UpdateCubeInfoAfterMergeStep.java       | 14 +++++++++++---
 .../apache/kylin/provision/BuildCubeWithStream.java  | 15 +++++++++++----
 2 files changed, 22 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/27ccf441/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
index 3185bec..018abab 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
@@ -31,11 +31,10 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.SegmentRange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- */
 public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
 
     private static final Logger logger = LoggerFactory.getLogger(UpdateCubeInfoAfterMergeStep.class);
@@ -66,10 +65,15 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
         }
         long sourceCount = 0L;
         long sourceSize = 0L;
+
+        boolean isOffsetCube = mergedSegment.isOffsetCube();
+        Long tsStartMin = Long.MAX_VALUE, tsEndMax = 0L;
         for (String id : mergingSegmentIds) {
             CubeSegment segment = cube.getSegmentById(id);
             sourceCount += segment.getInputRecords();
             sourceSize += segment.getInputRecordsSize();
+            tsStartMin = Math.min(tsStartMin, segment.getTSRange().start.v);
+            tsEndMax = Math.max(tsEndMax, segment.getTSRange().end.v);
         }
 
         // update segment info
@@ -79,6 +83,11 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
         mergedSegment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams()));
         mergedSegment.setLastBuildTime(System.currentTimeMillis());
 
+        if (isOffsetCube == true) {
+            SegmentRange.TSRange tsRange = new SegmentRange.TSRange(tsStartMin, tsEndMax);
+            mergedSegment.setTSRange(tsRange);
+        }
+
         try {
             cubeManager.promoteNewlyBuiltSegments(cube, mergedSegment);
             return new ExecuteResult(ExecuteResult.State.SUCCEED);
@@ -87,5 +96,4 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
             return ExecuteResult.createError(e);
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/27ccf441/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index f9277bc..f7b8275 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -232,7 +232,8 @@ public class BuildCubeWithStream {
         for (int i = 0; i < futures.size(); i++) {
             ExecutableState result = futures.get(i).get(20, TimeUnit.MINUTES);
             logger.info("Checking building task " + i + " whose state is " + result);
-            Assert.assertTrue(result == null || result == ExecutableState.SUCCEED || result == ExecutableState.DISCARDED);
+            Assert.assertTrue(
+                    result == null || result == ExecutableState.SUCCEED || result == ExecutableState.DISCARDED);
             if (result == ExecutableState.SUCCEED)
                 succeedBuild++;
         }
@@ -250,6 +251,9 @@ public class BuildCubeWithStream {
             segments = cubeManager.getCube(cubeName).getSegments();
             Assert.assertTrue(segments.size() == 1);
 
+            SegmentRange.TSRange tsRange = segments.get(0).getTSRange();
+            Assert.assertTrue(tsRange.duration() > 0);
+
             CubeSegment toRefreshSeg = segments.get(0);
 
             refreshSegment(cubeName, toRefreshSeg.getSegRange());
@@ -279,7 +283,8 @@ public class BuildCubeWithStream {
     protected ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
         CubeInstance cubeInstance = cubeManager.getCube(cubeName);
         ISource source = SourceFactory.getSource(cubeInstance);
-        SourcePartition partition = source.enrichSourcePartitionBeforeBuild(cubeInstance, new SourcePartition(null, new SegmentRange(startOffset, endOffset), null, null));
+        SourcePartition partition = source.enrichSourcePartitionBeforeBuild(cubeInstance,
+                new SourcePartition(null, new SegmentRange(startOffset, endOffset), null, null));
         CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), partition);
         DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
         jobService.addJob(job);
@@ -298,7 +303,8 @@ public class BuildCubeWithStream {
         ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
         System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA);
         if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
-            throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.4.0.0-169");
+            throw new RuntimeException(
+                    "No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.4.0.0-169");
         }
         HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
     }
@@ -326,7 +332,8 @@ public class BuildCubeWithStream {
     protected void waitForJob(String jobId) {
         while (true) {
             AbstractExecutable job = jobService.getJob(jobId);
-            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR || job.getStatus() == ExecutableState.DISCARDED) {
+            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR
+                    || job.getStatus() == ExecutableState.DISCARDED) {
                 break;
             } else {
                 try {