You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/11/02 08:38:49 UTC
[27/32] kylin git commit: KYLIN-2122 Move the partition offset
calculation before submitting job
KYLIN-2122 Move the partition offset calculation before submitting job
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2beccbf9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2beccbf9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2beccbf9
Branch: refs/heads/v1.6.0-rc1-hbase1.x
Commit: 2beccbf94fd8e43c072959aeec98cc05d8552788
Parents: b6f608f
Author: shaofengshi <sh...@apache.org>
Authored: Tue Oct 25 11:54:22 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Nov 2 13:47:34 2016 +0800
----------------------------------------------------------------------
.../org/apache/kylin/cube/CubeInstance.java | 9 ++
.../java/org/apache/kylin/cube/CubeManager.java | 103 +++++-------------
.../org/apache/kylin/cube/CubeManagerTest.java | 2 +-
.../org/apache/kylin/cube/CubeSegmentsTest.java | 2 +-
.../kylin/job/constant/ExecutableConstants.java | 1 +
.../java/org/apache/kylin/source/ISource.java | 13 ++-
.../org/apache/kylin/source/SourceFactory.java | 2 +-
.../apache/kylin/source/SourcePartition.java | 103 ++++++++++++++++++
.../kylin/engine/mr/JobBuilderSupport.java | 2 +
.../mr/steps/UpdateCubeInfoAfterBuildStep.java | 67 ++++++++++++
.../kylin/provision/BuildCubeWithEngine.java | 8 +-
.../kylin/provision/BuildCubeWithStream.java | 17 ++-
.../apache/kylin/rest/service/JobService.java | 16 +--
.../apache/kylin/source/hive/HiveSource.java | 32 +++++-
.../apache/kylin/source/kafka/KafkaMRInput.java | 90 ++++++++++------
.../apache/kylin/source/kafka/KafkaSource.java | 105 +++++++++++++++++-
.../kylin/source/kafka/config/KafkaConfig.java | 3 +
.../source/kafka/hadoop/KafkaInputFormat.java | 41 ++++---
.../kylin/source/kafka/job/SeekOffsetStep.java | 106 +------------------
.../source/kafka/job/UpdateTimeRangeStep.java | 78 +-------------
.../kylin/source/kafka/util/KafkaClient.java | 24 ++++-
21 files changed, 483 insertions(+), 341 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/2beccbf9/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 720690d..7222457 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -439,6 +439,15 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
return this.getDescriptor().getAutoMergeTimeRanges() != null && this.getDescriptor().getAutoMergeTimeRanges().length > 0;
}
+ public CubeSegment getLastSegment() {
+ List<CubeSegment> existing = getSegments();
+ if (existing.isEmpty()) {
+ return null;
+ } else {
+ return existing.get(existing.size() - 1);
+ }
+ }
+
@Override
public int getSourceType() {
return getFactTableDesc().getSourceType();
http://git-wip-us.apache.org/repos/asf/kylin/blob/2beccbf9/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index a53849e..16b468f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -34,8 +34,6 @@ import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigExt;
@@ -68,9 +66,11 @@ import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.source.ReadableTable;
import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourcePartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
@@ -434,52 +434,20 @@ public class CubeManager implements IRealizationProvider {
// append a full build segment
public CubeSegment appendSegment(CubeInstance cube) throws IOException {
- return appendSegment(cube, 0, 0, 0, 0, null, null);
+ return appendSegment(cube, 0, Long.MAX_VALUE, 0, 0, null, null);
}
public CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate) throws IOException {
return appendSegment(cube, startDate, endDate, 0, 0, null, null);
}
+ public CubeSegment appendSegment(CubeInstance cube, SourcePartition sourcePartition) throws IOException {
+ return appendSegment(cube, sourcePartition.getStartDate(), sourcePartition.getEndDate(), sourcePartition.getStartOffset(), sourcePartition.getEndOffset(), sourcePartition.getSourcePartitionOffsetStart(), sourcePartition.getSourcePartitionOffsetEnd());
+ }
+
public CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) throws IOException {
checkBuildingSegment(cube);
- if (sourcePartitionOffsetStart == null) {
- sourcePartitionOffsetStart = Maps.newHashMap();
- }
- if (sourcePartitionOffsetEnd == null) {
- sourcePartitionOffsetEnd = Maps.newHashMap();
- }
-
- boolean isOffsetsOn = endOffset != 0;
- if (isOffsetsOn == true) {
- checkSourceOffsets(startOffset, endOffset, sourcePartitionOffsetStart, sourcePartitionOffsetEnd);
- }
-
- if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned()) {
- // try figure out a reasonable start if missing
- if (startDate == 0 && startOffset == 0) {
- final CubeSegment last = getLatestSegment(cube);
- if (last != null) {
- if (isOffsetsOn) {
- if (last.getSourceOffsetEnd() == Long.MAX_VALUE) {
- throw new IllegalStateException("There is already one pending for building segment, please submit request later.");
- }
- startOffset = last.getSourceOffsetEnd();
- sourcePartitionOffsetStart = last.getSourcePartitionOffsetEnd();
- } else {
- startDate = last.getDateRangeEnd();
- }
- }
- }
-
- } else {
- startDate = 0;
- endDate = Long.MAX_VALUE;
- startOffset = 0;
- endOffset = 0;
- }
-
CubeSegment newSegment = newSegment(cube, startDate, endDate, startOffset, endOffset);
newSegment.setSourcePartitionOffsetStart(sourcePartitionOffsetStart);
newSegment.setSourcePartitionOffsetEnd(sourcePartitionOffsetEnd);
@@ -638,7 +606,7 @@ public class CubeManager implements IRealizationProvider {
return max;
}
- private CubeSegment getLatestSegment(CubeInstance cube) {
+ public CubeSegment getLatestSegment(CubeInstance cube) {
List<CubeSegment> existing = cube.getSegments();
if (existing.isEmpty()) {
return null;
@@ -647,49 +615,28 @@ public class CubeManager implements IRealizationProvider {
}
}
- private void checkBuildingSegment(CubeInstance cube) {
- int maxBuldingSeg = cube.getConfig().getMaxBuildingSegments();
- if (cube.getBuildingSegments().size() >= maxBuldingSeg) {
- throw new IllegalStateException("There is already " + cube.getBuildingSegments().size() + " building segment; ");
+ private long calculateStartOffsetForAppendSegment(CubeInstance cube) {
+ List<CubeSegment> existing = cube.getSegments();
+ if (existing.isEmpty()) {
+ return 0;
+ } else {
+ return existing.get(existing.size() - 1).getSourceOffsetEnd();
}
}
- private void checkSourceOffsets(long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) {
- if (endOffset <= 0)
- return;
-
- if (startOffset >= endOffset) {
- throw new IllegalArgumentException("'startOffset' need be smaller than 'endOffset'");
- }
-
- if (startOffset > 0) {
- if (sourcePartitionOffsetStart == null || sourcePartitionOffsetStart.size() == 0) {
- throw new IllegalArgumentException("When 'startOffset' is > 0, need provide each partition's start offset");
- }
-
- long totalOffset = 0;
- for (Long v : sourcePartitionOffsetStart.values()) {
- totalOffset += v;
- }
-
- if (totalOffset != startOffset) {
- throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'");
- }
+ private long calculateStartDateForAppendSegment(CubeInstance cube) {
+ List<CubeSegment> existing = cube.getSegments();
+ if (existing.isEmpty()) {
+ return cube.getDescriptor().getPartitionDateStart();
+ } else {
+ return existing.get(existing.size() - 1).getDateRangeEnd();
}
+ }
- if (endOffset > 0 && endOffset != Long.MAX_VALUE) {
- if (sourcePartitionOffsetEnd == null || sourcePartitionOffsetEnd.size() == 0) {
- throw new IllegalArgumentException("When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset");
- }
-
- long totalOffset = 0;
- for (Long v : sourcePartitionOffsetEnd.values()) {
- totalOffset += v;
- }
-
- if (totalOffset != endOffset) {
- throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'");
- }
+ private void checkBuildingSegment(CubeInstance cube) {
+ int maxBuldingSeg = cube.getConfig().getMaxBuildingSegments();
+ if (cube.getBuildingSegments().size() >= maxBuldingSeg) {
+ throw new IllegalStateException("There is already " + cube.getBuildingSegments().size() + " building segment; ");
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2beccbf9/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
index bb90d29..2904eb2 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
@@ -111,7 +111,7 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000, 0, 0, null, null);
seg1.setStatus(SegmentStatusEnum.READY);
- CubeSegment seg2 = mgr.appendSegment(cube, 0, 2000, 0, 0, null, null);
+ CubeSegment seg2 = mgr.appendSegment(cube, 1000, 2000, 0, 0, null, null);
seg2.setStatus(SegmentStatusEnum.READY);
CubeUpdate cubeBuilder = new CubeUpdate(cube);
http://git-wip-us.apache.org/repos/asf/kylin/blob/2beccbf9/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
index 828a3a9..a5bd821 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
@@ -110,7 +110,7 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase {
seg1.setStatus(SegmentStatusEnum.READY);
// append second
- CubeSegment seg2 = mgr.appendSegment(cube, 0, 2000);
+ CubeSegment seg2 = mgr.appendSegment(cube, 1000, 2000);
assertEquals(2, cube.getSegments().size());
assertEquals(1000, seg2.getDateRangeStart());
http://git-wip-us.apache.org/repos/asf/kylin/blob/2beccbf9/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index ad0b1b1..cec2e5d 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -52,6 +52,7 @@ public final class ExecutableConstants {
public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
public static final String STEP_NAME_HIVE_CLEANUP = "Hive Cleanup";
+ public static final String STEP_NAME_KAFKA_CLEANUP = "Kafka Intermediate File Cleanup";
public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection";
public static final String STEP_NAME_GARBAGE_COLLECTION_HDFS = "Garbage Collection on HDFS";
public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
http://git-wip-us.apache.org/repos/asf/kylin/blob/2beccbf9/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
index e9216f9..5bff8a7 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
@@ -18,15 +18,18 @@
package org.apache.kylin.source;
-import org.apache.kylin.metadata.model.TableDesc;
-
import java.util.List;
+import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.TableDesc;
+
public interface ISource {
- public <I> I adaptToBuildEngine(Class<I> engineInterface);
+ <I> I adaptToBuildEngine(Class<I> engineInterface);
+
+ ReadableTable createReadableTable(TableDesc tableDesc);
- public ReadableTable createReadableTable(TableDesc tableDesc);
+ List<String> getMRDependentResources(TableDesc table);
- public List<String> getMRDependentResources(TableDesc table);
+ SourcePartition parsePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2beccbf9/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
index e82c6ed..5ce9014 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
@@ -31,7 +31,7 @@ public class SourceFactory {
private static ImplementationSwitch<ISource> sources;
static {
Map<Integer, String> impls = KylinConfig.getInstanceFromEnv().getSourceEngines();
- sources = new ImplementationSwitch<ISource>(impls, ISource.class);
+ sources = new ImplementationSwitch<>(impls, ISource.class);
}
public static ISource tableSource(ISourceAware aware) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/2beccbf9/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
new file mode 100644
index 0000000..8ba749d
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import java.util.Map;
+
+/**
+ */
+public class SourcePartition {
+ long startDate;
+ long endDate;
+ long startOffset;
+ long endOffset;
+ Map<Integer, Long> sourcePartitionOffsetStart;
+ Map<Integer, Long> sourcePartitionOffsetEnd;
+
+ public SourcePartition() {
+ }
+
+ public SourcePartition(long startDate, long endDate, long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) {
+ this.startDate = startDate;
+ this.endDate = endDate;
+ this.startOffset = startOffset;
+ this.endOffset = endOffset;
+ this.sourcePartitionOffsetStart = sourcePartitionOffsetStart;
+ this.sourcePartitionOffsetEnd = sourcePartitionOffsetEnd;
+ }
+
+ public long getStartDate() {
+ return startDate;
+ }
+
+ public void setStartDate(long startDate) {
+ this.startDate = startDate;
+ }
+
+ public long getEndDate() {
+ return endDate;
+ }
+
+ public void setEndDate(long endDate) {
+ this.endDate = endDate;
+ }
+
+ public long getStartOffset() {
+ return startOffset;
+ }
+
+ public void setStartOffset(long startOffset) {
+ this.startOffset = startOffset;
+ }
+
+ public long getEndOffset() {
+ return endOffset;
+ }
+
+ public void setEndOffset(long endOffset) {
+ this.endOffset = endOffset;
+ }
+
+ public Map<Integer, Long> getSourcePartitionOffsetStart() {
+ return sourcePartitionOffsetStart;
+ }
+
+ public void setSourcePartitionOffsetStart(Map<Integer, Long> sourcePartitionOffsetStart) {
+ this.sourcePartitionOffsetStart = sourcePartitionOffsetStart;
+ }
+
+ public Map<Integer, Long> getSourcePartitionOffsetEnd() {
+ return sourcePartitionOffsetEnd;
+ }
+
+ public void setSourcePartitionOffsetEnd(Map<Integer, Long> sourcePartitionOffsetEnd) {
+ this.sourcePartitionOffsetEnd = sourcePartitionOffsetEnd;
+ }
+
+ public static SourcePartition getCopyOf(SourcePartition origin) {
+ SourcePartition copy = new SourcePartition();
+ copy.setStartDate(origin.getStartDate());
+ copy.setEndDate(origin.getEndDate());
+ copy.setStartOffset(origin.getStartOffset());
+ copy.setEndOffset(origin.getEndOffset());
+ copy.setSourcePartitionOffsetStart(origin.getSourcePartitionOffsetStart());
+ copy.setSourcePartitionOffsetEnd(origin.getSourcePartitionOffsetEnd());
+ return copy;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2beccbf9/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 159e5cb..47eb9c3 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -95,12 +95,14 @@ public class JobBuilderSupport {
public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId) {
final UpdateCubeInfoAfterBuildStep result = new UpdateCubeInfoAfterBuildStep();
result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+ result.getParams().put(BatchConstants.CFG_OUTPUT_PATH, getFactDistinctColumnsPath(jobId));
CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
CubingExecutableUtil.setIndexPath(this.getSecondaryIndexPath(jobId), result.getParams());
+
return result;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2beccbf9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index d6435b7..4e1be57 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -18,16 +18,29 @@
package org.apache.kylin.engine.mr.steps;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStreamReader;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,6 +72,10 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
segment.setInputRecordsSize(sourceSizeBytes);
try {
+ if (segment.isSourceOffsetsOn()) {
+ updateTimeRange(segment);
+ }
+
cubeManager.promoteNewlyBuiltSegments(cube, segment);
return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
} catch (IOException e) {
@@ -67,4 +84,54 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
}
}
+ private void updateTimeRange(CubeSegment segment) throws IOException {
+ final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
+ final String outputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
+ final Path outputFile = new Path(outputPath, partitionCol.getName());
+
+ String minValue = null, maxValue = null, currentValue = null;
+ FSDataInputStream inputStream = null;
+ BufferedReader bufferedReader = null;
+ try {
+ FileSystem fs = HadoopUtil.getFileSystem(outputPath);
+ inputStream = fs.open(outputFile);
+ bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
+ minValue = currentValue = bufferedReader.readLine();
+ while (currentValue != null) {
+ maxValue = currentValue;
+ currentValue = bufferedReader.readLine();
+ }
+ } catch (IOException e) {
+ throw e;
+ } finally {
+ IOUtils.closeQuietly(bufferedReader);
+ IOUtils.closeQuietly(inputStream);
+ }
+
+ final DataType partitionColType = partitionCol.getType();
+ FastDateFormat dateFormat;
+ if (partitionColType.isDate()) {
+ dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN);
+ } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
+ dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
+ } else if (partitionColType.isStringFamily()) {
+ String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat();
+ if (StringUtils.isEmpty(partitionDateFormat)) {
+ partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
+ }
+ dateFormat = DateFormat.getDateFormat(partitionDateFormat);
+ } else {
+ throw new IllegalStateException("Type " + partitionColType + " is not valid partition column type");
+ }
+
+ try {
+ long startTime = dateFormat.parse(minValue).getTime();
+ long endTime = dateFormat.parse(maxValue).getTime();
+ segment.setDateRangeStart(startTime);
+ segment.setDateRangeEnd(endTime);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2beccbf9/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index f6c8801..180d8d9 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -55,6 +55,9 @@ import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.source.ISource;
+import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourcePartition;
import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
@@ -404,7 +407,10 @@ public class BuildCubeWithEngine {
}
private String buildSegment(String cubeName, long startDate, long endDate) throws Exception {
- CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, endDate);
+ CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+ ISource source = SourceFactory.tableSource(cubeInstance);
+ SourcePartition partition = source.parsePartitionBeforeBuild(cubeInstance, new SourcePartition(0, endDate, 0, 0, null, null));
+ CubeSegment segment = cubeManager.appendSegment(cubeInstance, partition.getStartDate(), partition.getEndDate());
DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
jobService.addJob(job);
waitForJob(job.getId());
http://git-wip-us.apache.org/repos/asf/kylin/blob/2beccbf9/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 2faa8d0..000ac16 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -18,6 +18,8 @@
package org.apache.kylin.provision;
+import static java.lang.Thread.sleep;
+
import java.io.File;
import java.io.IOException;
import java.text.ParseException;
@@ -32,7 +34,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
-import com.google.common.collect.Lists;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.requests.MetadataResponse;
@@ -44,8 +45,6 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.metadata.streaming.StreamingConfig;
-import org.apache.kylin.metadata.streaming.StreamingManager;
import org.apache.kylin.job.DeployUtil;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
@@ -55,6 +54,11 @@ import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.job.streaming.Kafka10DataLoader;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingManager;
+import org.apache.kylin.source.ISource;
+import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourcePartition;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.config.BrokerConfig;
import org.apache.kylin.source.kafka.config.KafkaConfig;
@@ -64,7 +68,7 @@ import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static java.lang.Thread.sleep;
+import com.google.common.collect.Lists;
/**
* for streaming cubing case "test_streaming_table"
@@ -253,7 +257,10 @@ public class BuildCubeWithStream {
}
protected ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
- CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, null, null);
+ CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+ ISource source = SourceFactory.tableSource(cubeInstance);
+ SourcePartition partition = source.parsePartitionBeforeBuild(cubeInstance, new SourcePartition(0, 0, startOffset, endOffset, null, null));
+ CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), partition);
DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
jobService.addJob(job);
waitForJob(job.getId());
http://git-wip-us.apache.org/repos/asf/kylin/blob/2beccbf9/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index bc4d89c..49b9b9f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -51,6 +51,9 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.exception.BadRequestException;
+import org.apache.kylin.source.ISource;
+import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourcePartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -206,11 +209,12 @@ public class JobService extends BasicService {
}
checkCubeDescSignature(cube);
-
DefaultChainedExecutable job;
-
if (buildType == CubeBuildTypeEnum.BUILD) {
- CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset, sourcePartitionOffsetStart, sourcePartitionOffsetEnd);
+ ISource source = SourceFactory.tableSource(cube);
+ SourcePartition sourcePartition = new SourcePartition(startDate, endDate, startOffset, endOffset, sourcePartitionOffsetStart, sourcePartitionOffsetEnd);
+ sourcePartition = source.parsePartitionBeforeBuild(cube, sourcePartition);
+ CubeSegment newSeg = getCubeManager().appendSegment(cube, sourcePartition);
job = EngineFactory.createBatchCubingJob(newSeg, submitter);
} else if (buildType == CubeBuildTypeEnum.MERGE) {
CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force);
@@ -364,15 +368,11 @@ public class JobService extends BasicService {
@PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')")
public JobInstance cancelJob(JobInstance job) throws IOException, JobException {
- // CubeInstance cube = this.getCubeManager().getCube(job.getRelatedCube());
- // for (BuildCubeJob cubeJob: listAllCubingJobs(cube.getName(), null, EnumSet.of(ExecutableState.READY, ExecutableState.RUNNING))) {
- // getExecutableManager().stopJob(cubeJob.getId());
- // }
CubeInstance cubeInstance = getCubeManager().getCube(job.getRelatedCube());
final String segmentIds = job.getRelatedSegment();
for (String segmentId : StringUtils.split(segmentIds)) {
final CubeSegment segment = cubeInstance.getSegmentById(segmentId);
- if (segment != null && segment.getStatus() == SegmentStatusEnum.NEW) {
+ if (segment != null && (segment.getStatus() == SegmentStatusEnum.NEW || segment.getDateRangeEnd() == 0)) {
// Remove this segments
CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
cubeBuilder.setToRemoveSegs(segment);
http://git-wip-us.apache.org/repos/asf/kylin/blob/2beccbf9/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
index e9cebea..af0a519 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
@@ -18,13 +18,18 @@
package org.apache.kylin.source.hive;
-import com.google.common.collect.Lists;
+import java.util.List;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.ISource;
import org.apache.kylin.source.ReadableTable;
-import java.util.List;
+import com.google.common.collect.Lists;
+import org.apache.kylin.source.SourcePartition;
//used by reflection
public class HiveSource implements ISource {
@@ -49,4 +54,27 @@ public class HiveSource implements ISource {
return Lists.newArrayList();
}
+ @Override
+ public SourcePartition parsePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition) {
+ SourcePartition result = SourcePartition.getCopyOf(srcPartition);
+ CubeInstance cube = (CubeInstance) buildable;
+ if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned() == true) {
+ // normal partitioned cube
+ if (result.getStartDate() == 0) {
+ final CubeSegment last = cube.getLastSegment();
+ if (last != null) {
+ result.setStartDate(last.getDateRangeEnd());
+ }
+ }
+ } else {
+ // full build
+ result.setStartDate(0);
+ result.setEndDate(Long.MAX_VALUE);
+ }
+
+ result.setStartOffset(0);
+ result.setEndOffset(0);
+ return result;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2beccbf9/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index fb2a949..cdd7272 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -19,8 +19,12 @@ package org.apache.kylin.source.kafka;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.List;
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
@@ -30,26 +34,33 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableRef;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
import org.apache.kylin.source.kafka.job.MergeOffsetStep;
-import org.apache.kylin.source.kafka.job.SeekOffsetStep;
-import org.apache.kylin.source.kafka.job.UpdateTimeRangeStep;
+import com.google.common.base.Function;
import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class KafkaMRInput implements IMRInput {
@@ -57,7 +68,7 @@ public class KafkaMRInput implements IMRInput {
@Override
public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
- this.cubeSegment = (CubeSegment) flatDesc.getSegment();
+ this.cubeSegment = (CubeSegment)flatDesc.getSegment();
return new BatchCubingInputSide(cubeSegment);
}
@@ -65,8 +76,14 @@ public class KafkaMRInput implements IMRInput {
public IMRTableInputFormat getTableInputFormat(TableDesc table) {
KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(table.getIdentity());
- TableRef tableRef = cubeSegment.getCubeInstance().getDataModelDesc().findTable(table.getIdentity());
- List<TblColRef> columns = Lists.newArrayList(tableRef.getColumns());
+ List<TblColRef> columns = Lists.transform(Arrays.asList(table.getColumns()), new Function<ColumnDesc, TblColRef>() {
+ @Nullable
+ @Override
+ public TblColRef apply(ColumnDesc input) {
+ return input.getRef();
+ }
+ });
+
return new KafkaTableInputFormat(cubeSegment, columns, kafkaConfig, null);
}
@@ -77,15 +94,11 @@ public class KafkaMRInput implements IMRInput {
public static class KafkaTableInputFormat implements IMRTableInputFormat {
private final CubeSegment cubeSegment;
- private List<TblColRef> columns;
private StreamingParser streamingParser;
- private KafkaConfig kafkaConfig;
private final JobEngineConfig conf;
public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig, JobEngineConfig conf) {
this.cubeSegment = cubeSegment;
- this.columns = columns;
- this.kafkaConfig = kafkaConfig;
this.conf = conf;
try {
streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns);
@@ -131,21 +144,9 @@ public class KafkaMRInput implements IMRInput {
@Override
public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
- jobFlow.addTask(createUpdateSegmentOffsetStep(jobFlow.getId()));
jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId()));
}
- public SeekOffsetStep createUpdateSegmentOffsetStep(String jobId) {
- final SeekOffsetStep result = new SeekOffsetStep();
- result.setName("Seek and update offset step");
-
- CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
- CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
- CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
-
- return result;
- }
-
private MapReduceExecutable createSaveKafkaDataStep(String jobId) {
MapReduceExecutable result = new MapReduceExecutable();
@@ -167,14 +168,10 @@ public class KafkaMRInput implements IMRInput {
@Override
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
- final UpdateTimeRangeStep result = new UpdateTimeRangeStep();
- result.setName("Update Segment Time Range");
- CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
- CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
- CubingExecutableUtil.setCubingJobId(jobFlow.getId(), result.getParams());
- JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "SYSTEM");
- result.getParams().put(BatchConstants.CFG_OUTPUT_PATH, jobBuilderSupport.getFactDistinctColumnsPath(jobFlow.getId()));
- jobFlow.addTask(result);
+ GarbageCollectionStep step = new GarbageCollectionStep();
+ step.setName(ExecutableConstants.STEP_NAME_KAFKA_CLEANUP);
+ step.setDataPath(outputPath);
+ jobFlow.addTask(step);
}
@@ -211,4 +208,37 @@ public class KafkaMRInput implements IMRInput {
}
}
+ static class GarbageCollectionStep extends AbstractExecutable {
+ private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ StringBuffer output = new StringBuffer();
+ try {
+ rmdirOnHDFS(getDataPath());
+ } catch (IOException e) {
+ logger.error("job:" + getId() + " execute finished with exception", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage());
+ }
+
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+ }
+
+ private void rmdirOnHDFS(String path) throws IOException {
+ Path externalDataPath = new Path(path);
+ FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration());
+ if (fs.exists(externalDataPath)) {
+ fs.delete(externalDataPath, true);
+ }
+ }
+
+ public void setDataPath(String externalDataPath) {
+ setParam("dataPath", externalDataPath);
+ }
+
+ private String getDataPath() {
+ return getParam("dataPath");
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2beccbf9/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 208c0ce..bb676e6 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -18,15 +18,24 @@
package org.apache.kylin.source.kafka;
-import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.metadata.streaming.StreamingConfig;
+import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.source.ISource;
import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.SourcePartition;
import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.kafka.util.KafkaClient;
-import java.util.List;
+import com.google.common.collect.Lists;
//used by reflection
public class KafkaSource implements ISource {
@@ -54,4 +63,94 @@ public class KafkaSource implements ISource {
return dependentResources;
}
+ @Override
+ public SourcePartition parsePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition) {
+ checkSourceOffsets(srcPartition);
+ final SourcePartition result = SourcePartition.getCopyOf(srcPartition);
+ final CubeInstance cube = (CubeInstance) buildable;
+ if (result.getStartOffset() == 0) {
+ final CubeSegment last = cube.getLastSegment();
+ if (last != null) {
+ // from last seg's end position
+ result.setSourcePartitionOffsetStart(last.getSourcePartitionOffsetEnd());
+ } else if (cube.getDescriptor().getPartitionOffsetStart() != null && cube.getDescriptor().getPartitionOffsetStart().size() > 0) {
+ result.setSourcePartitionOffsetStart(cube.getDescriptor().getPartitionOffsetStart());
+ } else {
+ // from the topic's very begining;
+ result.setSourcePartitionOffsetStart(KafkaClient.getEarliestOffsets(cube));
+ }
+ }
+
+ final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(cube.getConfig()).getKafkaConfig(cube.getFactTable());
+ final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
+ final String topic = kafakaConfig.getTopic();
+ try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
+ final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+ if (partitionInfos.size() > result.getSourcePartitionOffsetStart().size()) {
+ // has new partition added
+ for (int x = result.getSourcePartitionOffsetStart().size(); x < partitionInfos.size(); x++) {
+ long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition());
+ result.getSourcePartitionOffsetStart().put(partitionInfos.get(x).partition(), earliest);
+ }
+ }
+ }
+
+ if (result.getEndOffset() == Long.MAX_VALUE) {
+ result.setSourcePartitionOffsetEnd(KafkaClient.getCurrentOffsets(cube));
+ }
+
+ long totalStartOffset = 0, totalEndOffset = 0;
+ for (Long v : result.getSourcePartitionOffsetStart().values()) {
+ totalStartOffset += v;
+ }
+ for (Long v : result.getSourcePartitionOffsetEnd().values()) {
+ totalEndOffset += v;
+ }
+
+ result.setStartOffset(totalStartOffset);
+ result.setEndOffset(totalEndOffset);
+
+ return result;
+ }
+
+ private void checkSourceOffsets(SourcePartition srcPartition) {
+ long startOffset = srcPartition.getStartOffset();
+ long endOffset = srcPartition.getEndOffset();
+ final Map<Integer, Long> sourcePartitionOffsetStart = srcPartition.getSourcePartitionOffsetStart();
+ final Map<Integer, Long> sourcePartitionOffsetEnd = srcPartition.getSourcePartitionOffsetEnd();
+ if (endOffset <= 0 || startOffset >= endOffset) {
+ throw new IllegalArgumentException("'startOffset' need be smaller than 'endOffset'");
+ }
+
+ if (startOffset > 0) {
+ if (sourcePartitionOffsetStart == null || sourcePartitionOffsetStart.size() == 0) {
+ throw new IllegalArgumentException("When 'startOffset' is > 0, need provide each partition's start offset");
+ }
+
+ long totalOffset = 0;
+ for (Long v : sourcePartitionOffsetStart.values()) {
+ totalOffset += v;
+ }
+
+ if (totalOffset != startOffset) {
+ throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'");
+ }
+ }
+
+ if (endOffset > 0 && endOffset != Long.MAX_VALUE) {
+ if (sourcePartitionOffsetEnd == null || sourcePartitionOffsetEnd.size() == 0) {
+ throw new IllegalArgumentException("When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset");
+ }
+
+ long totalOffset = 0;
+ for (Long v : sourcePartitionOffsetEnd.values()) {
+ totalOffset += v;
+ }
+
+ if (totalOffset != endOffset) {
+ throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'");
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2beccbf9/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index c538acb..157d83c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -61,6 +61,7 @@ public class KafkaConfig extends RootPersistentEntity {
@JsonProperty("parserName")
private String parserName;
+ @Deprecated
@JsonProperty("margin")
private long margin;
@@ -120,10 +121,12 @@ public class KafkaConfig extends RootPersistentEntity {
this.name = name;
}
+ @Deprecated
public long getMargin() {
return margin;
}
+ @Deprecated
public void setMargin(long margin) {
this.margin = margin;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2beccbf9/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
index 81f6bac..fe0e2cc 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
@@ -23,9 +23,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import org.apache.kylin.source.kafka.util.KafkaClient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
@@ -36,6 +33,10 @@ import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kylin.source.kafka.util.KafkaClient;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
/**
* Convert Kafka topic to Hadoop InputFormat
@@ -45,16 +46,16 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
- Configuration conf = context.getConfiguration();
+ final Configuration conf = context.getConfiguration();
- String brokers = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BROKERS);
- String inputTopic = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TOPIC);
- String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
- Integer partitionMin = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MIN));
- Integer partitionMax = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MAX));
+ final String brokers = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BROKERS);
+ final String inputTopic = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TOPIC);
+ final String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
+ final Integer partitionMin = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MIN));
+ final Integer partitionMax = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MAX));
- Map<Integer, Long> startOffsetMap = Maps.newHashMap();
- Map<Integer, Long> endOffsetMap = Maps.newHashMap();
+ final Map<Integer, Long> startOffsetMap = Maps.newHashMap();
+ final Map<Integer, Long> endOffsetMap = Maps.newHashMap();
for (int i = partitionMin; i <= partitionMax; i++) {
String start = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_START + i);
String end = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_END + i);
@@ -64,23 +65,19 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
}
}
- List<InputSplit> splits = new ArrayList<InputSplit>();
+ final List<InputSplit> splits = new ArrayList<InputSplit>();
try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, null)) {
- List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic);
+ final List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic);
Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(), "partition number mismatch with server side");
for (int i = 0; i < partitionInfos.size(); i++) {
- PartitionInfo partition = partitionInfos.get(i);
+ final PartitionInfo partition = partitionInfos.get(i);
int partitionId = partition.partition();
if (startOffsetMap.containsKey(partitionId) == false) {
throw new IllegalStateException("Partition '" + partitionId + "' not exists.");
}
- if (endOffsetMap.get(partitionId) > startOffsetMap.get(partitionId)) {
- InputSplit split = new KafkaInputSplit(
- brokers, inputTopic,
- partitionId,
- startOffsetMap.get(partitionId), endOffsetMap.get(partitionId)
- );
+ if (endOffsetMap.get(partitionId) > startOffsetMap.get(partitionId)) {
+ InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, startOffsetMap.get(partitionId), endOffsetMap.get(partitionId));
splits.add(split);
}
}
@@ -89,9 +86,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
}
@Override
- public RecordReader<LongWritable, BytesWritable> createRecordReader(
- InputSplit arg0, TaskAttemptContext arg1) throws IOException,
- InterruptedException {
+ public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
return new KafkaInputRecordReader();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2beccbf9/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
index 98d6e4d..acaa751 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
@@ -17,28 +17,15 @@
*/
package org.apache.kylin.source.kafka.job;
-import org.apache.kylin.source.kafka.KafkaConfigManager;
-import org.apache.kylin.source.kafka.util.KafkaClient;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
/**
+ * Deprecated, not in use.
*/
public class SeekOffsetStep extends AbstractExecutable {
@@ -50,97 +37,8 @@ public class SeekOffsetStep extends AbstractExecutable {
@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
- final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
- final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
-
- Map<Integer, Long> startOffsets = segment.getSourcePartitionOffsetStart();
- Map<Integer, Long> endOffsets = segment.getSourcePartitionOffsetEnd();
-
- if (startOffsets.size() > 0 && endOffsets.size() > 0 && startOffsets.size() == endOffsets.size()) {
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "skipped, as the offset is provided.");
- }
-
- final Map<Integer, Long> cubeDescStart = cube.getDescriptor().getPartitionOffsetStart();
- if (cube.getSegments().size() == 1 && cubeDescStart != null && cubeDescStart.size() > 0) {
- logger.info("This is the first segment, and has initiated the start offsets, will use it");
- startOffsets = cubeDescStart;
- }
-
- final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(context.getConfig()).getKafkaConfig(cube.getFactTable());
- final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
- final String topic = kafakaConfig.getTopic();
- try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
- final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
-
- if (startOffsets.isEmpty()) {
- // user didn't specify start offset, use the biggest offset in existing segments as start
- for (CubeSegment seg : cube.getSegments()) {
- Map<Integer, Long> segEndOffset = seg.getSourcePartitionOffsetEnd();
- for (PartitionInfo partition : partitionInfos) {
- int partitionId = partition.partition();
- if (segEndOffset.containsKey(partitionId)) {
- startOffsets.put(partitionId, Math.max(startOffsets.containsKey(partitionId) ? startOffsets.get(partitionId) : 0, segEndOffset.get(partitionId)));
- }
- }
- }
- logger.info("Get start offset for segment " + segment.getName() + ": " + startOffsets.toString());
- }
-
- if (partitionInfos.size() > startOffsets.size()) {
- // has new partition added
- for (int x = startOffsets.size(); x < partitionInfos.size(); x++) {
- long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition());
- startOffsets.put(partitionInfos.get(x).partition(), earliest);
- }
- }
-
- if (endOffsets.isEmpty()) {
- // user didn't specify end offset, use latest offset in kafka
- for (PartitionInfo partitionInfo : partitionInfos) {
- long latest = KafkaClient.getLatestOffset(consumer, topic, partitionInfo.partition());
- endOffsets.put(partitionInfo.partition(), latest);
- }
-
- logger.info("Get end offset for segment " + segment.getName() + ": " + endOffsets.toString());
- }
- }
-
- long totalStartOffset = 0, totalEndOffset = 0;
- for (Long v : startOffsets.values()) {
- totalStartOffset += v;
- }
- for (Long v : endOffsets.values()) {
- totalEndOffset += v;
- }
-
- if (totalEndOffset > totalStartOffset) {
- segment.setSourceOffsetStart(totalStartOffset);
- segment.setSourceOffsetEnd(totalEndOffset);
- segment.setSourcePartitionOffsetStart(startOffsets);
- segment.setSourcePartitionOffsetEnd(endOffsets);
- segment.setName(CubeSegment.makeSegmentName(0, 0, totalStartOffset, totalEndOffset));
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToUpdateSegs(segment);
- try {
- cubeManager.updateCube(cubeBuilder);
- } catch (IOException e) {
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed, offset start: " + totalStartOffset + ", offset end: " + totalEndOffset + ", message count: " + (totalEndOffset - totalStartOffset));
- } else {
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToRemoveSegs(segment);
- try {
- cubeManager.updateCube(cubeBuilder);
- } catch (IOException e) {
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
-
- return new ExecuteResult(ExecuteResult.State.DISCARDED, "No new message comes");
- }
-
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "No in use");
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2beccbf9/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java
index d19aa63..8c31c70 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java
@@ -17,34 +17,15 @@
*/
package org.apache.kylin.source.kafka.job;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.time.FastDateFormat;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
+ * Deprecated, not in use.
*/
public class UpdateTimeRangeStep extends AbstractExecutable {
@@ -56,62 +37,7 @@ public class UpdateTimeRangeStep extends AbstractExecutable {
@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
- final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
- final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
- final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
- final String outputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
- final Path outputFile = new Path(outputPath, partitionCol.getName());
-
- String minValue = null, maxValue = null, currentValue = null;
- FSDataInputStream inputStream = null;
- BufferedReader bufferedReader = null;
- try {
- FileSystem fs = HadoopUtil.getFileSystem(outputPath);
- inputStream = fs.open(outputFile);
- bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
- minValue = currentValue = bufferedReader.readLine();
- while (currentValue != null) {
- maxValue = currentValue;
- currentValue = bufferedReader.readLine();
- }
- } catch (IOException e) {
- logger.error("fail to read file " + outputFile, e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- } finally {
- IOUtils.closeQuietly(bufferedReader);
- IOUtils.closeQuietly(inputStream);
- }
-
- final DataType partitionColType = partitionCol.getType();
- FastDateFormat dateFormat;
- if (partitionColType.isDate()) {
- dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN);
- } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
- dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
- } else if (partitionColType.isStringFamily()) {
- String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat();
- if (StringUtils.isEmpty(partitionDateFormat)) {
- partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
- }
- dateFormat = DateFormat.getDateFormat(partitionDateFormat);
- } else {
- return new ExecuteResult(ExecuteResult.State.ERROR, "Type " + partitionColType + " is not valid partition column type");
- }
-
- try {
- long startTime = dateFormat.parse(minValue).getTime();
- long endTime = dateFormat.parse(maxValue).getTime();
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- segment.setDateRangeStart(startTime);
- segment.setDateRangeEnd(endTime);
- cubeBuilder.setToUpdateSegs(segment);
- cubeManager.updateCube(cubeBuilder);
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
- } catch (Exception e) {
- logger.error("fail to update cube segment offset", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2beccbf9/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index 685af6a..a0bbd22 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -59,7 +59,7 @@ public class KafkaClient {
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 50);
- props.put("timeout.ms", "30000");
+ props.put("request.timeout.ms", "30000");
if (properties != null) {
for (Map.Entry entry : properties.entrySet()) {
props.put(entry.getKey(), entry.getValue());
@@ -75,12 +75,12 @@ public class KafkaClient {
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", consumerGroup);
props.put("session.timeout.ms", "30000");
- props.put("enable.auto.commit", "false");
if (properties != null) {
for (Map.Entry entry : properties.entrySet()) {
props.put(entry.getKey(), entry.getValue());
}
}
+ props.put("enable.auto.commit", "false");
return props;
}
@@ -126,7 +126,25 @@ public class KafkaClient {
try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
- long latest = KafkaClient.getLatestOffset(consumer, topic, partitionInfo.partition());
+ long latest = getLatestOffset(consumer, topic, partitionInfo.partition());
+ startOffsets.put(partitionInfo.partition(), latest);
+ }
+ }
+ return startOffsets;
+ }
+
+
+ public static Map<Integer, Long> getEarliestOffsets(final CubeInstance cubeInstance) {
+ final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(cubeInstance.getConfig()).getKafkaConfig(cubeInstance.getFactTable());
+
+ final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
+ final String topic = kafakaConfig.getTopic();
+
+ Map<Integer, Long> startOffsets = Maps.newHashMap();
+ try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
+ final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+ for (PartitionInfo partitionInfo : partitionInfos) {
+ long latest = getEarliestOffset(consumer, topic, partitionInfo.partition());
startOffsets.put(partitionInfo.partition(), latest);
}
}