You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/10/04 08:31:53 UTC

[08/30] kylin git commit: Revert "Revert "KYLIN-1726 use segment uuid instead of name""

Revert "Revert "KYLIN-1726 use segment uuid instead of name""

This reverts commit 1f4880479cd3132786062723ba70312440de4805.


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f89e35f6
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f89e35f6
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f89e35f6

Branch: refs/heads/master-hbase1.x
Commit: f89e35f6309c9bec43cc16e68fb35b7490aecc38
Parents: 8431af4
Author: shaofengshi <sh...@apache.org>
Authored: Sat Sep 24 14:56:17 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Sep 27 10:17:40 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeManager.java |  5 +++-
 .../kylin/provision/BuildCubeWithStream.java    | 26 +++++++++++++++++---
 .../apache/kylin/source/kafka/KafkaMRInput.java |  2 +-
 .../source/kafka/hadoop/KafkaFlatTableJob.java  | 11 +++------
 .../kafka/hadoop/KafkaInputRecordReader.java    |  9 ++++---
 5 files changed, 36 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/f89e35f6/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 2fadedb..cc2baa5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -479,8 +479,11 @@ public class CubeManager implements IRealizationProvider {
         updateCube(cubeBuilder);
         return newSegment;
     }
-
     public CubeSegment refreshSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset) throws IOException {
+        return refreshSegment(cube, startDate, endDate, startOffset, endOffset, true);
+    }
+
+    public CubeSegment refreshSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, boolean strictChecking) throws IOException {
         checkNoBuildingSegment(cube);
 
         CubeSegment newSegment = newSegment(cube, startDate, endDate, startOffset, endOffset);

http://git-wip-us.apache.org/repos/asf/kylin/blob/f89e35f6/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 7f79acc..9e779ab 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -21,6 +21,8 @@ package org.apache.kylin.provision;
 import java.io.File;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.List;
 import java.util.TimeZone;
 import java.util.UUID;
 
@@ -145,18 +147,34 @@ public class BuildCubeWithStream {
         //merge
         mergeSegment(cubeName, 0, 15000);
 
+        List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments();
+        Assert.assertTrue(segments.size() == 1);
+
+        CubeSegment toRefreshSeg = segments.get(0);
+        HashMap<String, String> partitionOffsetMap = toRefreshSeg.getAdditionalInfo();
+
+        refreshSegment(cubeName, toRefreshSeg.getSourceOffsetStart(), toRefreshSeg.getSourceOffsetEnd(), partitionOffsetMap);
+        segments = cubeManager.getCube(cubeName).getSegments();
+        Assert.assertTrue(segments.size() == 1);
+
     }
 
     private String mergeSegment(String cubeName, long startOffset, long endOffset) throws Exception {
-        CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, true);
+        CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false);
         DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST");
         jobService.addJob(job);
         waitForJob(job.getId());
         return job.getId();
     }
 
-    private String refreshSegment(String cubeName, long startOffset, long endOffset) throws Exception {
-        CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
+    private String refreshSegment(String cubeName, long startOffset, long endOffset, HashMap<String, String> partitionOffsetMap) throws Exception {
+        CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false);
+        segment.setAdditionalInfo(partitionOffsetMap);
+        CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+        CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
+        cubeBuilder.setToUpdateSegs(segment);
+        cubeManager.updateCube(cubeBuilder);
+        segment = cubeManager.getCube(cubeName).getSegmentById(segment.getUuid());
         DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
         jobService.addJob(job);
         waitForJob(job.getId());
@@ -164,7 +182,7 @@ public class BuildCubeWithStream {
     }
 
     private String buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
-        CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
+        CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false);
         DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
         jobService.addJob(job);
         waitForJob(job.getId());

http://git-wip-us.apache.org/repos/asf/kylin/blob/f89e35f6/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index cfce137..a5f678f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -165,7 +165,7 @@ public class KafkaMRInput implements IMRInput {
             jobBuilderSupport.appendMapReduceParameters(cmd);
             JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
             JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
-            JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+            JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
             JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step");
 
             result.setMapReduceParams(cmd.toString());

http://git-wip-us.apache.org/repos/asf/kylin/blob/f89e35f6/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index decfb60..87d2471 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -33,7 +33,6 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
@@ -70,14 +69,14 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
             options.addOption(OPTION_JOB_NAME);
             options.addOption(OPTION_CUBE_NAME);
             options.addOption(OPTION_OUTPUT_PATH);
-            options.addOption(OPTION_SEGMENT_NAME);
+            options.addOption(OPTION_SEGMENT_ID);
             parseOptions(options, args);
 
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
             String cubeName = getOptionValue(OPTION_CUBE_NAME);
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
 
-            String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+            String segmentId = getOptionValue(OPTION_SEGMENT_ID);
 
             // ----------------------------------------------------------------------------
             // add metadata to distributed cache
@@ -85,7 +84,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
             CubeInstance cube = cubeMgr.getCube(cubeName);
 
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentId);
             logger.info("Starting: " + job.getJobName());
 
             setJobClasspath(job, cube.getConfig());
@@ -104,11 +103,9 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
             job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
             job.getConfiguration().set(CONFIG_KAFKA_BUFFER_SIZE, String.valueOf(kafkaConfig.getBufferSize()));
             job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json");
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
             job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName());
             job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name
-            setupMapper(cube.getSegment(segmentName, SegmentStatusEnum.NEW));
+            setupMapper(cube.getSegmentById(segmentId));
             job.setNumReduceTasks(0);
             FileOutputFormat.setOutputPath(job, output);
             FileOutputFormat.setCompressOutput(job, true);

http://git-wip-us.apache.org/repos/asf/kylin/blob/f89e35f6/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
index f67fef5..6774c9d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
@@ -105,6 +105,11 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
             value = new BytesWritable();
         }
 
+        if (watermark >= latestOffset) {
+            log.info("Reach the end offset, stop reading.");
+            return false;
+        }
+
         if (messages == null) {
             log.info("{} fetching offset {} ", topic + ":" + split.getBrokers() + ":" + partition, watermark);
             TopicPartition topicPartition = new TopicPartition(topic, partition);
@@ -119,10 +124,6 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
 
         if (iterator.hasNext()) {
             ConsumerRecord<String, String> message = iterator.next();
-            if (message.offset() >= latestOffset) {
-                log.info("Reach the end offset, stop reading.");
-                return false;
-            }
             key.set(message.offset());
             byte[] valuebytes = Bytes.toBytes(message.value());
             value.set(valuebytes, 0, valuebytes.length);