You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/09/27 02:19:51 UTC
[19/28] kylin git commit: Revert "Revert "KYLIN-1726 use segment uuid
instead of name""
Revert "Revert "KYLIN-1726 use segment uuid instead of name""
This reverts commit 1f4880479cd3132786062723ba70312440de4805.
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f89e35f6
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f89e35f6
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f89e35f6
Branch: refs/heads/KYLIN-1726-2
Commit: f89e35f6309c9bec43cc16e68fb35b7490aecc38
Parents: 8431af4
Author: shaofengshi <sh...@apache.org>
Authored: Sat Sep 24 14:56:17 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Sep 27 10:17:40 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeManager.java | 5 +++-
.../kylin/provision/BuildCubeWithStream.java | 26 +++++++++++++++++---
.../apache/kylin/source/kafka/KafkaMRInput.java | 2 +-
.../source/kafka/hadoop/KafkaFlatTableJob.java | 11 +++------
.../kafka/hadoop/KafkaInputRecordReader.java | 9 ++++---
5 files changed, 36 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/f89e35f6/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 2fadedb..cc2baa5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -479,8 +479,11 @@ public class CubeManager implements IRealizationProvider {
updateCube(cubeBuilder);
return newSegment;
}
-
public CubeSegment refreshSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset) throws IOException {
+ return refreshSegment(cube, startDate, endDate, startOffset, endOffset, true);
+ }
+
+ public CubeSegment refreshSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, boolean strictChecking) throws IOException {
checkNoBuildingSegment(cube);
CubeSegment newSegment = newSegment(cube, startDate, endDate, startOffset, endOffset);
http://git-wip-us.apache.org/repos/asf/kylin/blob/f89e35f6/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 7f79acc..9e779ab 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -21,6 +21,8 @@ package org.apache.kylin.provision;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.List;
import java.util.TimeZone;
import java.util.UUID;
@@ -145,18 +147,34 @@ public class BuildCubeWithStream {
//merge
mergeSegment(cubeName, 0, 15000);
+ List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments();
+ Assert.assertTrue(segments.size() == 1);
+
+ CubeSegment toRefreshSeg = segments.get(0);
+ HashMap<String, String> partitionOffsetMap = toRefreshSeg.getAdditionalInfo();
+
+ refreshSegment(cubeName, toRefreshSeg.getSourceOffsetStart(), toRefreshSeg.getSourceOffsetEnd(), partitionOffsetMap);
+ segments = cubeManager.getCube(cubeName).getSegments();
+ Assert.assertTrue(segments.size() == 1);
+
}
private String mergeSegment(String cubeName, long startOffset, long endOffset) throws Exception {
- CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, true);
+ CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false);
DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST");
jobService.addJob(job);
waitForJob(job.getId());
return job.getId();
}
- private String refreshSegment(String cubeName, long startOffset, long endOffset) throws Exception {
- CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
+ private String refreshSegment(String cubeName, long startOffset, long endOffset, HashMap<String, String> partitionOffsetMap) throws Exception {
+ CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false);
+ segment.setAdditionalInfo(partitionOffsetMap);
+ CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+ CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
+ cubeBuilder.setToUpdateSegs(segment);
+ cubeManager.updateCube(cubeBuilder);
+ segment = cubeManager.getCube(cubeName).getSegmentById(segment.getUuid());
DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
jobService.addJob(job);
waitForJob(job.getId());
@@ -164,7 +182,7 @@ public class BuildCubeWithStream {
}
private String buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
- CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
+ CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false);
DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
jobService.addJob(job);
waitForJob(job.getId());
http://git-wip-us.apache.org/repos/asf/kylin/blob/f89e35f6/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index cfce137..a5f678f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -165,7 +165,7 @@ public class KafkaMRInput implements IMRInput {
jobBuilderSupport.appendMapReduceParameters(cmd);
JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
- JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+ JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step");
result.setMapReduceParams(cmd.toString());
http://git-wip-us.apache.org/repos/asf/kylin/blob/f89e35f6/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index decfb60..87d2471 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -33,7 +33,6 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.slf4j.Logger;
@@ -70,14 +69,14 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
options.addOption(OPTION_JOB_NAME);
options.addOption(OPTION_CUBE_NAME);
options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_SEGMENT_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
parseOptions(options, args);
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
String cubeName = getOptionValue(OPTION_CUBE_NAME);
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+ String segmentId = getOptionValue(OPTION_SEGMENT_ID);
// ----------------------------------------------------------------------------
// add metadata to distributed cache
@@ -85,7 +84,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
CubeInstance cube = cubeMgr.getCube(cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentId);
logger.info("Starting: " + job.getJobName());
setJobClasspath(job, cube.getConfig());
@@ -104,11 +103,9 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
job.getConfiguration().set(CONFIG_KAFKA_BUFFER_SIZE, String.valueOf(kafkaConfig.getBufferSize()));
job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json");
- job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName());
job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name
- setupMapper(cube.getSegment(segmentName, SegmentStatusEnum.NEW));
+ setupMapper(cube.getSegmentById(segmentId));
job.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(job, output);
FileOutputFormat.setCompressOutput(job, true);
http://git-wip-us.apache.org/repos/asf/kylin/blob/f89e35f6/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
index f67fef5..6774c9d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
@@ -105,6 +105,11 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
value = new BytesWritable();
}
+ if (watermark >= latestOffset) {
+ log.info("Reach the end offset, stop reading.");
+ return false;
+ }
+
if (messages == null) {
log.info("{} fetching offset {} ", topic + ":" + split.getBrokers() + ":" + partition, watermark);
TopicPartition topicPartition = new TopicPartition(topic, partition);
@@ -119,10 +124,6 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
if (iterator.hasNext()) {
ConsumerRecord<String, String> message = iterator.next();
- if (message.offset() >= latestOffset) {
- log.info("Reach the end offset, stop reading.");
- return false;
- }
key.set(message.offset());
byte[] valuebytes = Bytes.toBytes(message.value());
value.set(valuebytes, 0, valuebytes.length);