You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/10/10 05:38:03 UTC
[6/6] kylin git commit: KYLIN-1726 package rename
KYLIN-1726 package rename
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c67fa740
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c67fa740
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c67fa740
Branch: refs/heads/master
Commit: c67fa740d364d372c7a6424fd160570cc7e890c4
Parents: 5aee022
Author: shaofengshi <sh...@apache.org>
Authored: Sun Oct 9 13:24:59 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Oct 10 13:32:44 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/source/kafka/KafkaMRInput.java | 3 +
.../kylin/source/kafka/MergeOffsetStep.java | 80 -----------
.../kylin/source/kafka/SeekOffsetStep.java | 140 ------------------
.../source/kafka/StringStreamingParser.java | 51 -------
.../apache/kylin/source/kafka/TopicMeta.java | 46 ------
.../kylin/source/kafka/UpdateTimeRangeStep.java | 117 ---------------
.../kylin/source/kafka/job/MergeOffsetStep.java | 80 +++++++++++
.../kylin/source/kafka/job/SeekOffsetStep.java | 141 +++++++++++++++++++
.../source/kafka/job/UpdateTimeRangeStep.java | 117 +++++++++++++++
.../kafka/util/ByteBufferBackedInputStream.java | 6 +-
10 files changed, 344 insertions(+), 437 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 6358ee1..4d1f5c9 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -43,6 +43,9 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.kafka.job.MergeOffsetStep;
+import org.apache.kylin.source.kafka.job.SeekOffsetStep;
+import org.apache.kylin.source.kafka.job.UpdateTimeRangeStep;
import javax.annotation.Nullable;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
deleted file mode 100644
index 18c959a..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class MergeOffsetStep extends AbstractExecutable {
-
- private static final Logger logger = LoggerFactory.getLogger(MergeOffsetStep.class);
- public MergeOffsetStep() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
- final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
- final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
-
- List<CubeSegment> mergingSegs = cube.getMergingSegments(segment);
-
- Collections.sort(mergingSegs);
-
- final CubeSegment first = mergingSegs.get(0);
- final CubeSegment last = mergingSegs.get(mergingSegs.size() - 1);
-
- segment.setSourceOffsetStart(first.getSourceOffsetStart());
- segment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
- segment.setSourceOffsetEnd(last.getSourceOffsetEnd());
- segment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
-
- long dateRangeStart = CubeManager.minDateRangeStart(mergingSegs);
- long dateRangeEnd = CubeManager.maxDateRangeEnd(mergingSegs);
-
- segment.setDateRangeStart(dateRangeStart);
- segment.setDateRangeEnd(dateRangeEnd);
-
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToUpdateSegs(segment);
- try {
- cubeManager.updateCube(cubeBuilder);
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
- } catch (IOException e) {
- logger.error("fail to update cube segment offset", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
deleted file mode 100644
index 151b912..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka;
-
-import org.apache.kylin.source.kafka.util.KafkaClient;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- */
-public class SeekOffsetStep extends AbstractExecutable {
-
- private static final Logger logger = LoggerFactory.getLogger(SeekOffsetStep.class);
-
- public SeekOffsetStep() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
- final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
- final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
-
- Map<Integer, Long> startOffsets = segment.getSourcePartitionOffsetStart();
- Map<Integer, Long> endOffsets = segment.getSourcePartitionOffsetEnd();
-
- if (startOffsets.size() > 0 && endOffsets.size() > 0 && startOffsets.size() == endOffsets.size()) {
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "skipped, as the offset is provided.");
- }
-
- final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(context.getConfig()).getKafkaConfig(cube.getFactTable());
- final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
- final String topic = kafakaConfig.getTopic();
- try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
- final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
-
- if (startOffsets.isEmpty()) {
- // user didn't specify start offset, use the biggest offset in existing segments as start
- for (CubeSegment seg : cube.getSegments()) {
- Map<Integer, Long> segEndOffset = seg.getSourcePartitionOffsetEnd();
- for (PartitionInfo partition : partitionInfos) {
- int partitionId = partition.partition();
- if (segEndOffset.containsKey(partitionId)) {
- startOffsets.put(partitionId, Math.max(startOffsets.containsKey(partitionId) ? startOffsets.get(partitionId) : 0, segEndOffset.get(partitionId)));
- }
- }
- }
-
- if (partitionInfos.size() > startOffsets.size()) {
- // has new partition added
- for (int x = startOffsets.size(); x < partitionInfos.size(); x++) {
- long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition());
- startOffsets.put(partitionInfos.get(x).partition(), earliest);
- }
- }
-
- logger.info("Get start offset for segment " + segment.getName() + ": " + startOffsets.toString());
- }
-
- if (endOffsets.isEmpty()) {
- // user didn't specify end offset, use latest offset in kafka
- for (PartitionInfo partitionInfo : partitionInfos) {
- long latest = KafkaClient.getLatestOffset(consumer, topic, partitionInfo.partition());
- endOffsets.put(partitionInfo.partition(), latest);
- }
-
- logger.info("Get end offset for segment " + segment.getName() + ": " + endOffsets.toString());
- }
- }
-
- long totalStartOffset = 0, totalEndOffset = 0;
- for (Long v : startOffsets.values()) {
- totalStartOffset += v;
- }
- for (Long v : endOffsets.values()) {
- totalEndOffset += v;
- }
-
- if (totalEndOffset > totalStartOffset) {
- segment.setSourceOffsetStart(totalStartOffset);
- segment.setSourceOffsetEnd(totalEndOffset);
- segment.setSourcePartitionOffsetStart(startOffsets);
- segment.setSourcePartitionOffsetEnd(endOffsets);
- segment.setName(CubeSegment.makeSegmentName(0, 0, totalStartOffset, totalEndOffset));
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToUpdateSegs(segment);
- try {
- cubeManager.updateCube(cubeBuilder);
- } catch (IOException e) {
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed, offset start: " + totalStartOffset + ", offset end: " + totalEndOffset + ", message count: " + (totalEndOffset - totalStartOffset));
- } else {
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToRemoveSegs(segment);
- try {
- cubeManager.updateCube(cubeBuilder);
- } catch (IOException e) {
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
-
- return new ExecuteResult(ExecuteResult.State.DISCARDED, "No new message comes");
- }
-
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
deleted file mode 100644
index f74df83..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.kafka;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public final class StringStreamingParser extends StreamingParser {
-
- public static final StringStreamingParser instance = new StringStreamingParser(null, null);
-
- private StringStreamingParser(List<TblColRef> allColumns, Map<String, String> properties) {
- }
-
- @Override
- public StreamingMessage parse(ByteBuffer message) {
- byte[] bytes = new byte[message.limit()];
- message.get(bytes);
- return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), 0, 0, Collections.<String, Object> emptyMap());
- }
-
- @Override
- public boolean filter(StreamingMessage streamingMessage) {
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java
deleted file mode 100644
index a73543e..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.kafka;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * The topic metadata should be invariant, otherwise will cause re-initialization of the Consumer
- *
- */
-public class TopicMeta {
-
- private final String name;
-
- private final List<Integer> partitionIds;
-
- public TopicMeta(String name, List<Integer> partitionIds) {
- this.name = name;
- this.partitionIds = Collections.unmodifiableList(partitionIds);
- }
-
- public String getName() {
- return name;
- }
-
- public List<Integer> getPartitionIds() {
- return partitionIds;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
deleted file mode 100644
index 9e902d8..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.time.FastDateFormat;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class UpdateTimeRangeStep extends AbstractExecutable {
-
- private static final Logger logger = LoggerFactory.getLogger(UpdateTimeRangeStep.class);
-
- public UpdateTimeRangeStep() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
- final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
- final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
- final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
- final String outputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
- final Path outputFile = new Path(outputPath, partitionCol.getName());
-
- String minValue = null, maxValue = null, currentValue = null;
- FSDataInputStream inputStream = null;
- BufferedReader bufferedReader = null;
- try {
- FileSystem fs = HadoopUtil.getFileSystem(outputPath);
- inputStream = fs.open(outputFile);
- bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
- minValue = currentValue = bufferedReader.readLine();
- while (currentValue != null) {
- maxValue = currentValue;
- currentValue = bufferedReader.readLine();
- }
- } catch (IOException e) {
- logger.error("fail to read file " + outputFile, e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- } finally {
- IOUtils.closeQuietly(bufferedReader);
- IOUtils.closeQuietly(inputStream);
- }
-
- final DataType partitionColType = partitionCol.getType();
- FastDateFormat dateFormat;
- if (partitionColType.isDate()) {
- dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN);
- } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
- dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
- } else if (partitionColType.isStringFamily()) {
- String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat();
- if (StringUtils.isEmpty(partitionDateFormat)) {
- partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
- }
- dateFormat = DateFormat.getDateFormat(partitionDateFormat);
- } else {
- return new ExecuteResult(ExecuteResult.State.ERROR, "Type " + partitionColType + " is not valid partition column type");
- }
-
- try {
- long startTime = dateFormat.parse(minValue).getTime();
- long endTime = dateFormat.parse(maxValue).getTime();
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- segment.setDateRangeStart(startTime);
- segment.setDateRangeEnd(endTime);
- cubeBuilder.setToUpdateSegs(segment);
- cubeManager.updateCube(cubeBuilder);
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
- } catch (Exception e) {
- logger.error("fail to update cube segment offset", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
new file mode 100644
index 0000000..9cadd72
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.kafka.job;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class MergeOffsetStep extends AbstractExecutable {
+
+ private static final Logger logger = LoggerFactory.getLogger(MergeOffsetStep.class);
+ public MergeOffsetStep() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
+ final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+ final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+
+ List<CubeSegment> mergingSegs = cube.getMergingSegments(segment);
+
+ Collections.sort(mergingSegs);
+
+ final CubeSegment first = mergingSegs.get(0);
+ final CubeSegment last = mergingSegs.get(mergingSegs.size() - 1);
+
+ segment.setSourceOffsetStart(first.getSourceOffsetStart());
+ segment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
+ segment.setSourceOffsetEnd(last.getSourceOffsetEnd());
+ segment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
+
+ long dateRangeStart = CubeManager.minDateRangeStart(mergingSegs);
+ long dateRangeEnd = CubeManager.maxDateRangeEnd(mergingSegs);
+
+ segment.setDateRangeStart(dateRangeStart);
+ segment.setDateRangeEnd(dateRangeEnd);
+
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToUpdateSegs(segment);
+ try {
+ cubeManager.updateCube(cubeBuilder);
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ } catch (IOException e) {
+ logger.error("fail to update cube segment offset", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
new file mode 100644
index 0000000..5751095
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.kafka.job;
+
+import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.source.kafka.util.KafkaClient;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ */
+public class SeekOffsetStep extends AbstractExecutable {
+
+ private static final Logger logger = LoggerFactory.getLogger(SeekOffsetStep.class);
+
+ public SeekOffsetStep() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
+ final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+ final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+
+ Map<Integer, Long> startOffsets = segment.getSourcePartitionOffsetStart();
+ Map<Integer, Long> endOffsets = segment.getSourcePartitionOffsetEnd();
+
+ if (startOffsets.size() > 0 && endOffsets.size() > 0 && startOffsets.size() == endOffsets.size()) {
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "skipped, as the offset is provided.");
+ }
+
+ final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(context.getConfig()).getKafkaConfig(cube.getFactTable());
+ final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
+ final String topic = kafakaConfig.getTopic();
+ try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
+ final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+
+ if (startOffsets.isEmpty()) {
+ // user didn't specify start offset, use the biggest offset in existing segments as start
+ for (CubeSegment seg : cube.getSegments()) {
+ Map<Integer, Long> segEndOffset = seg.getSourcePartitionOffsetEnd();
+ for (PartitionInfo partition : partitionInfos) {
+ int partitionId = partition.partition();
+ if (segEndOffset.containsKey(partitionId)) {
+ startOffsets.put(partitionId, Math.max(startOffsets.containsKey(partitionId) ? startOffsets.get(partitionId) : 0, segEndOffset.get(partitionId)));
+ }
+ }
+ }
+
+ if (partitionInfos.size() > startOffsets.size()) {
+ // has new partition added
+ for (int x = startOffsets.size(); x < partitionInfos.size(); x++) {
+ long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition());
+ startOffsets.put(partitionInfos.get(x).partition(), earliest);
+ }
+ }
+
+ logger.info("Get start offset for segment " + segment.getName() + ": " + startOffsets.toString());
+ }
+
+ if (endOffsets.isEmpty()) {
+ // user didn't specify end offset, use latest offset in kafka
+ for (PartitionInfo partitionInfo : partitionInfos) {
+ long latest = KafkaClient.getLatestOffset(consumer, topic, partitionInfo.partition());
+ endOffsets.put(partitionInfo.partition(), latest);
+ }
+
+ logger.info("Get end offset for segment " + segment.getName() + ": " + endOffsets.toString());
+ }
+ }
+
+ long totalStartOffset = 0, totalEndOffset = 0;
+ for (Long v : startOffsets.values()) {
+ totalStartOffset += v;
+ }
+ for (Long v : endOffsets.values()) {
+ totalEndOffset += v;
+ }
+
+ if (totalEndOffset > totalStartOffset) {
+ segment.setSourceOffsetStart(totalStartOffset);
+ segment.setSourceOffsetEnd(totalEndOffset);
+ segment.setSourcePartitionOffsetStart(startOffsets);
+ segment.setSourcePartitionOffsetEnd(endOffsets);
+ segment.setName(CubeSegment.makeSegmentName(0, 0, totalStartOffset, totalEndOffset));
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToUpdateSegs(segment);
+ try {
+ cubeManager.updateCube(cubeBuilder);
+ } catch (IOException e) {
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed, offset start: " + totalStartOffset + ", offset end: " + totalEndOffset + ", message count: " + (totalEndOffset - totalStartOffset));
+ } else {
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToRemoveSegs(segment);
+ try {
+ cubeManager.updateCube(cubeBuilder);
+ } catch (IOException e) {
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+
+ return new ExecuteResult(ExecuteResult.State.DISCARDED, "No new message comes");
+ }
+
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java
new file mode 100644
index 0000000..d19aa63
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.kafka.job;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class UpdateTimeRangeStep extends AbstractExecutable {
+
+ private static final Logger logger = LoggerFactory.getLogger(UpdateTimeRangeStep.class);
+
+ public UpdateTimeRangeStep() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
+ final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+ final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+ final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
+ final String outputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
+ final Path outputFile = new Path(outputPath, partitionCol.getName());
+
+ String minValue = null, maxValue = null, currentValue = null;
+ FSDataInputStream inputStream = null;
+ BufferedReader bufferedReader = null;
+ try {
+ FileSystem fs = HadoopUtil.getFileSystem(outputPath);
+ inputStream = fs.open(outputFile);
+ bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
+ minValue = currentValue = bufferedReader.readLine();
+ while (currentValue != null) {
+ maxValue = currentValue;
+ currentValue = bufferedReader.readLine();
+ }
+ } catch (IOException e) {
+ logger.error("fail to read file " + outputFile, e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ } finally {
+ IOUtils.closeQuietly(bufferedReader);
+ IOUtils.closeQuietly(inputStream);
+ }
+
+ final DataType partitionColType = partitionCol.getType();
+ FastDateFormat dateFormat;
+ if (partitionColType.isDate()) {
+ dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN);
+ } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
+ dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
+ } else if (partitionColType.isStringFamily()) {
+ String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat();
+ if (StringUtils.isEmpty(partitionDateFormat)) {
+ partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
+ }
+ dateFormat = DateFormat.getDateFormat(partitionDateFormat);
+ } else {
+ return new ExecuteResult(ExecuteResult.State.ERROR, "Type " + partitionColType + " is not valid partition column type");
+ }
+
+ try {
+ long startTime = dateFormat.parse(minValue).getTime();
+ long endTime = dateFormat.parse(maxValue).getTime();
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ segment.setDateRangeStart(startTime);
+ segment.setDateRangeEnd(endTime);
+ cubeBuilder.setToUpdateSegs(segment);
+ cubeManager.updateCube(cubeBuilder);
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ } catch (Exception e) {
+ logger.error("fail to update cube segment offset", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
index 7a42598..894a144 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
@@ -6,15 +6,15 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.source.kafka.util;
import java.io.IOException;