You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/09/27 00:25:55 UTC
[02/50] [abbrv] kylin git commit: Revert "KYLIN-1726 use segment uuid
instead of name"
Revert "KYLIN-1726 use segment uuid instead of name"
This reverts commit 42dafc15db40731582d6257c618eff29643930a8.
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1f488047
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1f488047
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1f488047
Branch: refs/heads/1.5.x-HBase1.x
Commit: 1f4880479cd3132786062723ba70312440de4805
Parents: dee8f2d
Author: Hongbin Ma <ma...@apache.org>
Authored: Mon Sep 19 23:51:57 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Tue Sep 20 11:43:08 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeManager.java | 5 +---
.../kylin/provision/BuildCubeWithStream.java | 26 +++-----------------
.../apache/kylin/source/kafka/KafkaMRInput.java | 2 +-
.../source/kafka/hadoop/KafkaFlatTableJob.java | 11 ++++++---
.../kafka/hadoop/KafkaInputRecordReader.java | 9 +++----
5 files changed, 17 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f488047/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index fc68798..daeca0d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -444,11 +444,8 @@ public class CubeManager implements IRealizationProvider {
updateCube(cubeBuilder);
return newSegment;
}
- public CubeSegment refreshSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset) throws IOException {
- return refreshSegment(cube, startDate, endDate, startOffset, endOffset, true);
- }
- public CubeSegment refreshSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, boolean strictChecking) throws IOException {
+ public CubeSegment refreshSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset) throws IOException {
checkNoBuildingSegment(cube);
CubeSegment newSegment = newSegment(cube, startDate, endDate, startOffset, endOffset);
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f488047/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 9e779ab..7f79acc 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -21,8 +21,6 @@ package org.apache.kylin.provision;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
-import java.util.HashMap;
-import java.util.List;
import java.util.TimeZone;
import java.util.UUID;
@@ -147,34 +145,18 @@ public class BuildCubeWithStream {
//merge
mergeSegment(cubeName, 0, 15000);
- List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments();
- Assert.assertTrue(segments.size() == 1);
-
- CubeSegment toRefreshSeg = segments.get(0);
- HashMap<String, String> partitionOffsetMap = toRefreshSeg.getAdditionalInfo();
-
- refreshSegment(cubeName, toRefreshSeg.getSourceOffsetStart(), toRefreshSeg.getSourceOffsetEnd(), partitionOffsetMap);
- segments = cubeManager.getCube(cubeName).getSegments();
- Assert.assertTrue(segments.size() == 1);
-
}
private String mergeSegment(String cubeName, long startOffset, long endOffset) throws Exception {
- CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false);
+ CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, true);
DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST");
jobService.addJob(job);
waitForJob(job.getId());
return job.getId();
}
- private String refreshSegment(String cubeName, long startOffset, long endOffset, HashMap<String, String> partitionOffsetMap) throws Exception {
- CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false);
- segment.setAdditionalInfo(partitionOffsetMap);
- CubeInstance cubeInstance = cubeManager.getCube(cubeName);
- CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
- cubeBuilder.setToUpdateSegs(segment);
- cubeManager.updateCube(cubeBuilder);
- segment = cubeManager.getCube(cubeName).getSegmentById(segment.getUuid());
+ private String refreshSegment(String cubeName, long startOffset, long endOffset) throws Exception {
+ CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
jobService.addJob(job);
waitForJob(job.getId());
@@ -182,7 +164,7 @@ public class BuildCubeWithStream {
}
private String buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
- CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false);
+ CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
jobService.addJob(job);
waitForJob(job.getId());
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f488047/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index a5f678f..cfce137 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -165,7 +165,7 @@ public class KafkaMRInput implements IMRInput {
jobBuilderSupport.appendMapReduceParameters(cmd);
JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
- JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+ JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step");
result.setMapReduceParams(cmd.toString());
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f488047/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index 87d2471..decfb60 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -33,6 +33,7 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.slf4j.Logger;
@@ -69,14 +70,14 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
options.addOption(OPTION_JOB_NAME);
options.addOption(OPTION_CUBE_NAME);
options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_SEGMENT_NAME);
parseOptions(options, args);
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
String cubeName = getOptionValue(OPTION_CUBE_NAME);
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- String segmentId = getOptionValue(OPTION_SEGMENT_ID);
+ String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
// ----------------------------------------------------------------------------
// add metadata to distributed cache
@@ -84,7 +85,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
CubeInstance cube = cubeMgr.getCube(cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentId);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
logger.info("Starting: " + job.getJobName());
setJobClasspath(job, cube.getConfig());
@@ -103,9 +104,11 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
job.getConfiguration().set(CONFIG_KAFKA_BUFFER_SIZE, String.valueOf(kafkaConfig.getBufferSize()));
job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json");
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName());
job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name
- setupMapper(cube.getSegmentById(segmentId));
+ setupMapper(cube.getSegment(segmentName, SegmentStatusEnum.NEW));
job.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(job, output);
FileOutputFormat.setCompressOutput(job, true);
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f488047/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
index 6774c9d..f67fef5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
@@ -105,11 +105,6 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
value = new BytesWritable();
}
- if (watermark >= latestOffset) {
- log.info("Reach the end offset, stop reading.");
- return false;
- }
-
if (messages == null) {
log.info("{} fetching offset {} ", topic + ":" + split.getBrokers() + ":" + partition, watermark);
TopicPartition topicPartition = new TopicPartition(topic, partition);
@@ -124,6 +119,10 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
if (iterator.hasNext()) {
ConsumerRecord<String, String> message = iterator.next();
+ if (message.offset() >= latestOffset) {
+ log.info("Reach the end offset, stop reading.");
+ return false;
+ }
key.set(message.offset());
byte[] valuebytes = Bytes.toBytes(message.value());
value.set(valuebytes, 0, valuebytes.length);