You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/06 13:05:43 UTC
[1/3] incubator-kylin git commit: KYLIN-1116 use local dictionary for
inverted index building
Repository: incubator-kylin
Updated Branches:
refs/heads/2.x-staging 8a77b8600 -> b3fefd140
KYLIN-1116 use local dictionary for inverted index building
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b3fefd14
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b3fefd14
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b3fefd14
Branch: refs/heads/2.x-staging
Commit: b3fefd1406b81c87e3cdede96c010fcade7ef1a0
Parents: d5d8a40
Author: shaofengshi <sh...@apache.org>
Authored: Fri Nov 6 20:02:30 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Nov 6 20:04:02 2015 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/job/BuildIIWithStreamTest.java | 2 +-
.../org/apache/kylin/job/hadoop/invertedindex/IITest.java | 2 +-
.../engine/mr/invertedindex/InvertedIndexReducer.java | 2 +-
.../apache/kylin/invertedindex/index/SliceBuilder.java | 6 ++----
.../java/org/apache/kylin/invertedindex/model/IIDesc.java | 10 ----------
5 files changed, 5 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b3fefd14/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 2d40b09..ddfd399 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -190,7 +190,7 @@ public class BuildIIWithStreamTest {
ToolRunner.run(new IICreateHTableJob(), args);
final IIDesc iiDesc = segment.getIIDesc();
- final SliceBuilder sliceBuilder = new SliceBuilder(desc, (short) 0, iiDesc.isUseLocalDictionary());
+ final SliceBuilder sliceBuilder = new SliceBuilder(desc, (short) 0);
List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
int count = sorted.size();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b3fefd14/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index a393ce3..ac91efb 100644
--- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -102,7 +102,7 @@ public class IITest extends LocalFileMetadataTestCase {
StreamingBatch batch = new StreamingBatch(streamingMessages, Pair.newPair(0L, System.currentTimeMillis()));
iiRows = Lists.newArrayList();
- final Slice slice = new SliceBuilder(iiDesc, (short) 0, true).buildSlice((batch));
+ final Slice slice = new SliceBuilder(iiDesc, (short) 0).buildSlice((batch));
IIKeyValueCodec codec = new IIKeyValueCodec(slice.getInfo());
for (IIRow iiRow : codec.encodeKeyValue(slice)) {
iiRows.add(iiRow);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b3fefd14/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
index 56c0b9e..c223159 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
@@ -73,7 +73,7 @@ public class InvertedIndexReducer extends KylinReducer<LongWritable, Object, Imm
sliceSize = ii.getDescriptor().getSliceSize();
short shard = (short) context.getTaskAttemptID().getTaskID().getId();
System.out.println("Generating to shard - " + shard);
- sliceBuilder = new SliceBuilder(seg.getIIDesc(), shard, true);
+ sliceBuilder = new SliceBuilder(seg.getIIDesc(), shard);
messages = Lists.newArrayListWithCapacity(sliceSize);
immutableBytesWritable = new ImmutableBytesWritable();
valueBuf = ByteBuffer.allocate(1024 * 1024); // 1MB
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b3fefd14/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
index 1c293d7..ef63b0f 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
@@ -38,12 +38,10 @@ public final class SliceBuilder {
private final BatchSliceMaker sliceMaker;
private final IIDesc iiDesc;
- private final boolean useLocalDict;
- public SliceBuilder(IIDesc desc, short shard, boolean useLocalDict) {
+ public SliceBuilder(IIDesc desc, short shard) {
this.iiDesc = desc;
this.sliceMaker = new BatchSliceMaker(desc, shard);
- this.useLocalDict = useLocalDict;
}
public Slice buildSlice(StreamingBatch microStreamBatch) {
@@ -54,7 +52,7 @@ public final class SliceBuilder {
return input.getData();
}
});
- final Dictionary<?>[] dictionaries = useLocalDict ? IIDictionaryBuilder.buildDictionary(messages, iiDesc) : new Dictionary[iiDesc.listAllColumns().size()];
+ final Dictionary<?>[] dictionaries = IIDictionaryBuilder.buildDictionary(messages, iiDesc);
TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaries);
return build(messages, tableRecordInfo, dictionaries);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b3fefd14/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
index bfa4eaa..71737dc 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
@@ -78,8 +78,6 @@ public class IIDesc extends RootPersistentEntity {
private short sharding = 1; // parallelism
@JsonProperty("slice_size")
private int sliceSize = 50000; // no. rows
- @JsonProperty("useLocalDictionary")
- private boolean useLocalDictionary = true;
@JsonProperty("engine_type")
private int engineType = IEngineAware.ID_MR_II;
@@ -374,14 +372,6 @@ public class IIDesc extends RootPersistentEntity {
this.name = name;
}
- public boolean isUseLocalDictionary() {
- return useLocalDictionary;
- }
-
- public void setUseLocalDictionary(boolean useLocalDictionary) {
- this.useLocalDictionary = useLocalDictionary;
- }
-
public String calculateSignature() {
MessageDigest md = null;
try {
[3/3] incubator-kylin git commit: KYLIN-1116 Use local dictionary for
InvertedIndex batch building
Posted by sh...@apache.org.
KYLIN-1116 Use local dictionary for InvertedIndex batch building
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d5d8a403
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d5d8a403
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d5d8a403
Branch: refs/heads/2.x-staging
Commit: d5d8a403830d3a3be35662cc19d2dc263a7b7fbd
Parents: 8a77b86
Author: shaofengshi <sh...@apache.org>
Authored: Fri Nov 6 16:24:29 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Nov 6 20:04:02 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/job/BuildIIWithStreamTest.java | 15 +-
.../kylin/job/hadoop/invertedindex/IITest.java | 11 +-
.../kylin/common/util/StreamingBatch.java | 59 +++++
.../kylin/common/util/StreamingMessage.java | 43 ++++
.../realization/IRealizationSegment.java | 1 -
.../kylin/engine/mr/JobBuilderSupport.java | 8 +-
.../engine/mr/common/AbstractHadoopJob.java | 3 -
.../mr/invertedindex/BatchIIJobBuilder.java | 61 +-----
.../CreateInvertedIndexDictionaryJob.java | 70 ------
.../IIDistinctColumnsCombiner.java | 58 -----
.../mr/invertedindex/IIDistinctColumnsJob.java | 138 ------------
.../invertedindex/IIDistinctColumnsMapper.java | 75 -------
.../invertedindex/IIDistinctColumnsReducer.java | 77 -------
.../engine/mr/invertedindex/IIJobBuilder.java | 219 -------------------
.../mr/invertedindex/InvertedIndexJob.java | 10 +-
.../mr/invertedindex/InvertedIndexMapper.java | 32 +--
.../mr/invertedindex/InvertedIndexReducer.java | 89 +++++---
.../UpdateIIInfoAfterBuildStep.java | 84 +++++++
.../UpdateInvertedIndexInfoAfterBuildStep.java | 93 --------
.../kylin/engine/streaming/IStreamingInput.java | 2 +
.../streaming/OneOffStreamingBuilder.java | 1 +
.../kylin/engine/streaming/StreamingBatch.java | 61 ------
.../engine/streaming/StreamingBatchBuilder.java | 1 +
.../engine/streaming/StreamingMessage.java | 43 ----
.../streaming/cube/StreamingCubeBuilder.java | 4 +-
.../streaming/invertedindex/SliceBuilder.java | 81 -------
.../invertedindex/test_kylin_ii_inner_join.json | 45 +---
.../invertedindex/test_kylin_ii_left_join.json | 45 +---
.../invertedindex/test_streaming_table_ii.json | 22 +-
.../apache/kylin/invertedindex/IIManager.java | 50 +----
.../apache/kylin/invertedindex/IISegment.java | 51 +----
.../kylin/invertedindex/index/SliceBuilder.java | 77 +++++++
.../invertedindex/IIInstanceTest.java | 5 -
.../apache/kylin/source/hive/HiveMRInput.java | 3 +
.../kylin/source/kafka/KafkaStreamingInput.java | 4 +-
.../kylin/source/kafka/StreamingParser.java | 2 +-
.../source/kafka/StringStreamingParser.java | 2 +-
.../source/kafka/TimedJsonStreamParser.java | 2 +-
.../kafka/diagnose/KafkaInputAnalyzer.java | 2 +-
.../kylin/source/kafka/util/KafkaUtils.java | 2 +-
.../storage/hbase/ii/IICreateHFileMapper.java | 19 +-
.../storage/hbase/ii/IICreateHTableJob.java | 31 ++-
.../storage/hbase/steps/HBaseMROutput.java | 2 +-
.../kylin/storage/hbase/steps/HBaseMRSteps.java | 18 +-
.../hbase/util/DeployCoprocessorCLI.java | 3 +-
.../storage/hbase/util/StorageCleanupJob.java | 2 -
.../hbase/common/TsConditionEraserTest.java | 6 +-
.../endpoint/EndpointAggregationTest.java | 27 ++-
.../endpoint/TableRecordInfoTest.java | 5 +
.../endpoint/TsConditionExtractorTest.java | 5 +
50 files changed, 469 insertions(+), 1300 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index b64a7c5..2d40b09 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -51,8 +51,8 @@ import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.engine.streaming.StreamingBatch;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingBatch;
+import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
@@ -61,7 +61,7 @@ import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.engine.streaming.invertedindex.SliceBuilder;
+import org.apache.kylin.invertedindex.index.SliceBuilder;
import org.apache.kylin.job.common.ShellExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
@@ -196,14 +196,13 @@ public class BuildIIWithStreamTest {
int count = sorted.size();
ArrayList<StreamingMessage> messages = Lists.newArrayList();
for (String[] row : sorted) {
- if (messages.size() < iiDesc.getSliceSize()) {
- messages.add(parse(row));
- } else {
+ messages.add((parse(row)));
+ if (messages.size() >= iiDesc.getSliceSize()) {
build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable);
- messages = Lists.newArrayList();
- messages.add((parse(row)));
+ messages.clear();
}
}
+
if (!messages.isEmpty()) {
build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index 200156a..a393ce3 100644
--- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -23,10 +23,11 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.kylin.common.util.FIFOIterable;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.engine.streaming.StreamingBatch;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingBatch;
+import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.index.Slice;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
@@ -35,7 +36,7 @@ import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
import org.apache.kylin.invertedindex.model.IIRow;
import org.apache.kylin.invertedindex.model.KeyValueCodec;
-import org.apache.kylin.engine.streaming.invertedindex.SliceBuilder;
+import org.apache.kylin.invertedindex.index.SliceBuilder;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.ConstantTupleFilter;
@@ -146,6 +147,10 @@ public class IITest extends LocalFileMetadataTestCase {
@Test
public void IIEndpointTest() {
TableRecordInfo info = new TableRecordInfo(ii.getDescriptor());
+ if (ii.getFirstSegment() == null) {
+ IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis());
+ ii.getSegments().add(segment);
+ }
CoprocessorRowType type = CoprocessorRowType.fromTableRecordInfo(info, ii.getFirstSegment().getColumns());
CoprocessorProjector projector = CoprocessorProjector.makeForEndpoint(info, Collections.singletonList(ii.getDescriptor().findColumnRef("default.test_kylin_fact", "lstg_format_name")));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java b/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java
new file mode 100644
index 0000000..ae5b72f
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java
@@ -0,0 +1,59 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+package org.apache.kylin.common.util;
+
+import java.util.List;
+
+/**
+ */
+public final class StreamingBatch {
+
+ private final List<StreamingMessage> messages;
+
+ private final Pair<Long, Long> timeRange;
+
+ public StreamingBatch(List<StreamingMessage> messages, Pair<Long, Long> timeRange) {
+ this.messages = messages;
+ this.timeRange = timeRange;
+ }
+
+ public List<StreamingMessage> getMessages() {
+ return messages;
+ }
+
+ public Pair<Long, Long> getTimeRange() {
+ return timeRange;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
new file mode 100644
index 0000000..2c150ff
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
@@ -0,0 +1,43 @@
+package org.apache.kylin.common.util;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ */
+public class StreamingMessage {
+
+ private final List<String> data;
+
+ private long offset;
+
+ private long timestamp;
+
+ private Map<String, Object> params;
+
+ public static final StreamingMessage EOF = new StreamingMessage(Collections.<String> emptyList(), 0L, 0L, Collections.<String, Object> emptyMap());
+
+ public StreamingMessage(List<String> data, long offset, long timestamp, Map<String, Object> params) {
+ this.data = data;
+ this.offset = offset;
+ this.timestamp = timestamp;
+ this.params = params;
+ }
+
+ public final List<String> getData() {
+ return data;
+ }
+
+ public final long getOffset() {
+ return offset;
+ }
+
+ public final long getTimestamp() {
+ return timestamp;
+ }
+
+ public Map<String, Object> getParams() {
+ return params;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java
index afab86b..3ac82a0 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java
@@ -4,7 +4,6 @@ import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
/**
- * Created by shaoshi on 10/30/15.
*/
public interface IRealizationSegment extends IBuildable {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 8c770f9..e3b07d8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -25,13 +25,12 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.engine.mr.invertedindex.UpdateInvertedIndexInfoAfterBuildStep;
+import org.apache.kylin.engine.mr.invertedindex.UpdateIIInfoAfterBuildStep;
import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob;
import org.apache.kylin.engine.mr.steps.MergeDictionaryStep;
import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep;
import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
-import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
@@ -125,11 +124,10 @@ public class JobBuilderSupport {
- public UpdateInvertedIndexInfoAfterBuildStep createUpdateInvertedIndexInfoAfterBuildStep(String jobId) {
- final UpdateInvertedIndexInfoAfterBuildStep updateIIInfoStep = new UpdateInvertedIndexInfoAfterBuildStep();
+ public UpdateIIInfoAfterBuildStep createUpdateIIInfoAfterBuildStep(String jobId) {
+ final UpdateIIInfoAfterBuildStep updateIIInfoStep = new UpdateIIInfoAfterBuildStep();
updateIIInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_II_INFO);
updateIIInfoStep.setInvertedIndexName(seg.getRealization().getName());
- updateIIInfoStep.setSegmentId(seg.getUuid());
updateIIInfoStep.setJobId(jobId);
return updateIIInfoStep;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 8782bbe..366a730 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -337,9 +337,6 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
TableDesc table = metaMgr.getTableDesc(tableName);
dumpList.add(table.getResourcePath());
}
- for (IISegment segment : ii.getSegments()) {
- dumpList.addAll(segment.getDictionaryPaths());
- }
attachKylinPropsAndMetadata(dumpList, conf);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
index 97e27d0..e7501b8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
@@ -18,30 +18,16 @@
package org.apache.kylin.engine.mr.invertedindex;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
import org.apache.kylin.engine.mr.IMROutput;
-import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.engine.mr.steps.BaseCuboidJob;
-import org.apache.kylin.engine.mr.steps.NDCuboidJob;
import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.IRealizationSegment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
public class BatchIIJobBuilder extends JobBuilderSupport {
private static final Logger logger = LoggerFactory.getLogger(BatchIIJobBuilder.class);
@@ -64,54 +50,20 @@ public class BatchIIJobBuilder extends JobBuilderSupport {
final String iiRootPath = getRealizationRootPath(jobId) + "/";
// Phase 1: Create Flat Table
inputSide.addStepPhase1_CreateFlatTable(result);
-
- final String intermediateTableIdentity = seg.getJoinedFlatTableDesc().getTableName();
- // Phase 2: Build Dictionary
- result.addTask(createIIFactDistinctColumnsStep(seg, intermediateTableIdentity, getFactDistinctColumnsPath(jobId)));
- result.addTask(createIIBuildDictionaryStep(seg, getFactDistinctColumnsPath(jobId)));
-
- // Phase 3: Build Cube
- result.addTask(createInvertedIndexStep((IISegment)seg, intermediateTableIdentity, iiRootPath));
+
+ // Phase 2: Build Inverted Index
+ result.addTask(createInvertedIndexStep((IISegment)seg, iiRootPath));
outputSide.addStepPhase3_BuildII(result, iiRootPath);
- // Phase 4: Update Metadata & Cleanup
- result.addTask(createUpdateInvertedIndexInfoAfterBuildStep(jobId));
+ // Phase 3: Update Metadata & Cleanup
+ result.addTask(createUpdateIIInfoAfterBuildStep(jobId));
inputSide.addStepPhase4_Cleanup(result);
outputSide.addStepPhase4_Cleanup(result);
return result;
}
- private MapReduceExecutable createIIFactDistinctColumnsStep(IRealizationSegment seg, String factTableName, String output) {
- MapReduceExecutable result = new MapReduceExecutable();
- result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
- result.setMapReduceJobClass(IIDistinctColumnsJob.class);
- StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
- appendExecCmdParameters(cmd, "tablename", factTableName);
- appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
- appendExecCmdParameters(cmd, "output", output);
- appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step");
-
- result.setMapReduceParams(cmd.toString());
- return result;
- }
-
- private HadoopShellExecutable createIIBuildDictionaryStep(IRealizationSegment seg, String factDistinctColumnsPath) {
- // base cuboid job
- HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
- buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
- StringBuilder cmd = new StringBuilder();
- appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
- appendExecCmdParameters(cmd, "input", factDistinctColumnsPath);
-
- buildDictionaryStep.setJobParams(cmd.toString());
- buildDictionaryStep.setJobClass(CreateInvertedIndexDictionaryJob.class);
- return buildDictionaryStep;
- }
-
- private MapReduceExecutable createInvertedIndexStep(IISegment seg, String intermediateHiveTable, String iiOutputTempPath) {
- // base cuboid job
+ private MapReduceExecutable createInvertedIndexStep(IISegment seg, String iiOutputTempPath) {
MapReduceExecutable buildIIStep = new MapReduceExecutable();
StringBuilder cmd = new StringBuilder();
@@ -120,7 +72,6 @@ public class BatchIIJobBuilder extends JobBuilderSupport {
buildIIStep.setName(ExecutableConstants.STEP_NAME_BUILD_II);
appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
- appendExecCmdParameters(cmd, "tablename", intermediateHiveTable);
appendExecCmdParameters(cmd, "output", iiOutputTempPath);
appendExecCmdParameters(cmd, "jobname", ExecutableConstants.STEP_NAME_BUILD_II);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java
deleted file mode 100644
index 39d74b4..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.dict.DistinctColumnValuesProvider;
-import org.apache.kylin.engine.mr.DFSFileTable;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.ReadableTable;
-
-/**
- */
-public class CreateInvertedIndexDictionaryJob extends AbstractHadoopJob {
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_II_NAME);
- options.addOption(OPTION_INPUT_PATH);
- parseOptions(options, args);
-
- final String iiname = getOptionValue(OPTION_II_NAME);
- final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH);
- final KylinConfig config = KylinConfig.getInstanceFromEnv();
-
- IIManager mgr = IIManager.getInstance(config);
- IIInstance ii = mgr.getII(iiname);
-
- mgr.buildInvertedIndexDictionary(ii.getFirstSegment(), new DistinctColumnValuesProvider() {
- @Override
- public ReadableTable getDistinctValuesFor(TblColRef col) {
- return new DFSFileTable(factColumnsInputPath + "/" + col.getName(), -1);
- }
- });
- return 0;
- } catch (Exception e) {
- printUsage(options);
- throw e;
- }
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new CreateInvertedIndexDictionaryJob(), args);
- System.exit(exitCode);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java
deleted file mode 100644
index 651ad63..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import java.io.IOException;
-import java.util.HashSet;
-
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.engine.mr.KylinReducer;
-
-/**
- * @author yangli9
- */
-public class IIDistinctColumnsCombiner extends KylinReducer<ShortWritable, Text, ShortWritable, Text> {
-
- private Text outputValue = new Text();
-
- @Override
- protected void setup(Context context) throws IOException {
- super.bindCurrentConfiguration(context.getConfiguration());
-
- }
-
- @Override
- public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-
- HashSet<ByteArray> set = new HashSet<ByteArray>();
- for (Text textValue : values) {
- ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
- set.add(value);
- }
-
- for (ByteArray value : set) {
- outputValue.set(value.array(), value.offset(), value.length());
- context.write(key, outputValue);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java
deleted file mode 100644
index fe968b1..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import java.io.IOException;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.IntermediateColumnDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author yangli9
- */
-public class IIDistinctColumnsJob extends AbstractHadoopJob {
- protected static final Logger logger = LoggerFactory.getLogger(IIDistinctColumnsJob.class);
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_TABLE_NAME);
- options.addOption(OPTION_II_NAME);
- options.addOption(OPTION_OUTPUT_PATH);
- parseOptions(options, args);
-
- job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
- String tableName = getOptionValue(OPTION_TABLE_NAME).toUpperCase();
- String iiName = getOptionValue(OPTION_II_NAME);
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-
- // ----------------------------------------------------------------------------
-
- logger.info("Starting: " + job.getJobName() + " on table " + tableName);
-
- IIManager iiMgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
- IIInstance ii = iiMgr.getII(iiName);
- job.getConfiguration().set(BatchConstants.TABLE_NAME, tableName);
- job.getConfiguration().set(BatchConstants.TABLE_COLUMNS, getColumns(ii));
-
- setJobClasspath(job);
-
-
- setupMapper(ii.getFirstSegment());
- setupReducer(output);
-
- Configuration conf = job.getConfiguration();
- conf.set(BatchConstants.CFG_II_NAME, ii.getName());
- attachKylinPropsAndMetadata(ii, conf);
- return waitForCompletion(job);
-
- } catch (Exception e) {
- printUsage(options);
- throw e;
- }
-
- }
-
- private String getColumns(IIInstance ii) {
- IIJoinedFlatTableDesc iiflat = new IIJoinedFlatTableDesc(ii.getDescriptor());
- StringBuilder buf = new StringBuilder();
- for (IntermediateColumnDesc col : iiflat.getColumnList()) {
- if (buf.length() > 0)
- buf.append(",");
- buf.append(col.getColumnName());
- }
- return buf.toString();
- }
-
- private void setupMapper(IISegment segment) throws IOException {
-
- IMRInput.IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat();
- flatTableInputFormat.configureJob(job);
-
- job.setMapperClass(IIDistinctColumnsMapper.class);
- job.setCombinerClass(IIDistinctColumnsCombiner.class);
- job.setMapOutputKeyClass(ShortWritable.class);
- job.setMapOutputValueClass(Text.class);
- }
-
- private void setupReducer(Path output) throws IOException {
- job.setReducerClass(IIDistinctColumnsReducer.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(Text.class);
-
- FileOutputFormat.setOutputPath(job, output);
- job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
-
- job.setNumReduceTasks(1);
-
- deletePath(job.getConfiguration(), output);
- }
-
- public static void main(String[] args) throws Exception {
- IIDistinctColumnsJob job = new IIDistinctColumnsJob();
- int exitCode = ToolRunner.run(job, args);
- System.exit(exitCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java
deleted file mode 100644
index c431ecd..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-
-/**
- * @author yangli9
- */
-public class IIDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, Object, ShortWritable, Text> {
-
- private ShortWritable outputKey = new ShortWritable();
- private Text outputValue = new Text();
-
- protected IMRInput.IMRTableInputFormat flatTableInputFormat;
-
- @Override
- protected void setup(Context context) throws IOException {
- super.bindCurrentConfiguration(context.getConfiguration());
- Configuration conf = context.getConfiguration();
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
- String iiName = conf.get(BatchConstants.CFG_II_NAME);
- IIInstance iiInstance = IIManager.getInstance(config).getII(iiName);
- IISegment seg = iiInstance.getFirstSegment();
- flatTableInputFormat = MRUtil.getBatchCubingInputSide(seg).getFlatTableInputFormat();
- }
-
- @Override
- public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
-
- String[] row = flatTableInputFormat.parseMapperInput(record);
-
- for (short i = 0; i < row.length; i++) {
- outputKey.set(i);
- if (row[i] == null)
- continue;
- byte[] bytes = Bytes.toBytes(row[i].toString());
- outputValue.set(bytes, 0, bytes.length);
- context.write(outputKey, outputValue);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java
deleted file mode 100644
index d50385f..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import java.io.IOException;
-import java.util.HashSet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.engine.mr.KylinReducer;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-
-/**
- * @author yangli9
- */
-public class IIDistinctColumnsReducer extends KylinReducer<ShortWritable, Text, NullWritable, Text> {
-
- private String[] columns;
-
- @Override
- protected void setup(Context context) throws IOException {
- super.bindCurrentConfiguration(context.getConfiguration());
-
- Configuration conf = context.getConfiguration();
- this.columns = conf.get(BatchConstants.TABLE_COLUMNS).split(",");
- }
-
- @Override
- public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
- String columnName = columns[key.get()];
-
- HashSet<ByteArray> set = new HashSet<ByteArray>();
- for (Text textValue : values) {
- ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
- set.add(value);
- }
-
- Configuration conf = context.getConfiguration();
- FileSystem fs = FileSystem.get(conf);
- String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
- FSDataOutputStream out = fs.create(new Path(outputPath, columnName));
-
- try {
- for (ByteArray value : set) {
- out.write(value.array(), value.offset(), value.length());
- out.write('\n');
- }
- } finally {
- out.close();
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java
deleted file mode 100644
index 18d3001..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java
+++ /dev/null
@@ -1,219 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements. See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership. The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-//*/
-//
-//package org.apache.kylin.engine.mr.invertedindex;
-//
-//import java.io.IOException;
-//import java.text.SimpleDateFormat;
-//import java.util.Date;
-//import java.util.TimeZone;
-//
-//import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
-//import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-//import org.apache.kylin.invertedindex.IISegment;
-//import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
-//import org.apache.kylin.job.constant.ExecutableConstants;
-//import org.apache.kylin.job.engine.JobEngineConfig;
-//import org.apache.kylin.job.execution.AbstractExecutable;
-//import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
-//
-//import com.google.common.base.Preconditions;
-//
-///**
-// */
-//public final class IIJobBuilder {
-//
-// final JobEngineConfig engineConfig;
-//
-// public IIJobBuilder(JobEngineConfig engineConfig) {
-// this.engineConfig = engineConfig;
-// }
-//
-// public IIJob buildJob(IISegment seg, String submitter) {
-// checkPreconditions(seg);
-//
-// IIJob result = initialJob(seg, "BUILD", submitter);
-// final String jobId = result.getId();
-// final IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(seg.getIIDesc());
-// final String intermediateTableIdentity = getIntermediateTableIdentity(intermediateTableDesc);
-// final String factDistinctColumnsPath = getIIDistinctColumnsPath(seg, jobId);
-// final String iiRootPath = getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/";
-// final String iiPath = iiRootPath + "*";
-//
-// final AbstractExecutable intermediateHiveTableStep = createFlatHiveTableStep(intermediateTableDesc, jobId);
-// result.addTask(intermediateHiveTableStep);
-//
-// result.addTask(createFactDistinctColumnsStep(seg, intermediateTableIdentity, jobId, factDistinctColumnsPath));
-//
-// result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath));
-//
-// result.addTask(createInvertedIndexStep(seg, intermediateTableIdentity, iiRootPath));
-//
-// // create htable step
-// result.addTask(createCreateHTableStep(seg));
-//
-// // generate hfiles step
-// result.addTask(createConvertToHfileStep(seg, iiPath, jobId));
-//
-// // bulk load step
-// result.addTask(createBulkLoadStep(seg, jobId));
-//
-// return result;
-// }
-//
-// private IIJob initialJob(IISegment seg, String type, String submitter) {
-// IIJob result = new IIJob();
-// SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
-// format.setTimeZone(TimeZone.getTimeZone(engineConfig.getTimeZone()));
-// result.setIIName(seg.getIIInstance().getName());
-// result.setSegmentId(seg.getUuid());
-// result.setName(seg.getIIInstance().getName() + " - " + seg.getName() + " - " + type + " - " + format.format(new Date(System.currentTimeMillis())));
-// result.setSubmitter(submitter);
-// return result;
-// }
-//
-// private void checkPreconditions(IISegment seg) {
-// Preconditions.checkNotNull(seg, "segment cannot be null");
-// Preconditions.checkNotNull(engineConfig, "jobEngineConfig cannot be null");
-// }
-//
-// private void appendMapReduceParameters(StringBuilder builder, JobEngineConfig engineConfig) {
-// try {
-// String jobConf = engineConfig.getHadoopJobConfFilePath(RealizationCapacity.MEDIUM);
-// if (jobConf != null && jobConf.length() > 0) {
-// builder.append(" -conf ").append(jobConf);
-// }
-// } catch (IOException e) {
-// throw new RuntimeException(e);
-// }
-// }
-//
-// private String getIIDistinctColumnsPath(IISegment seg, String jobUuid) {
-// return getJobWorkingDir(jobUuid) + "/" + seg.getIIInstance().getName() + "/ii_distinct_columns";
-// }
-//
-// private String getHFilePath(IISegment seg, String jobId) {
-// return getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/hfile/";
-// }
-//
-// private MapReduceExecutable createFactDistinctColumnsStep(IISegment seg, String factTableName, String jobId, String output) {
-// MapReduceExecutable result = new MapReduceExecutable();
-// result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
-// result.setMapReduceJobClass(IIDistinctColumnsJob.class);
-// StringBuilder cmd = new StringBuilder();
-// appendMapReduceParameters(cmd, engineConfig);
-// appendExecCmdParameters(cmd, "tablename", factTableName);
-// appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-// appendExecCmdParameters(cmd, "output", output);
-// appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getIIInstance().getName() + "_Step");
-//
-// result.setMapReduceParams(cmd.toString());
-// return result;
-// }
-//
-// private HadoopShellExecutable createBuildDictionaryStep(IISegment seg, String factDistinctColumnsPath) {
-// // base cuboid job
-// HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
-// buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
-// StringBuilder cmd = new StringBuilder();
-// appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-// appendExecCmdParameters(cmd, "input", factDistinctColumnsPath);
-//
-// buildDictionaryStep.setJobParams(cmd.toString());
-// buildDictionaryStep.setJobClass(CreateInvertedIndexDictionaryJob.class);
-// return buildDictionaryStep;
-// }
-//
-// private MapReduceExecutable createInvertedIndexStep(IISegment seg, String intermediateHiveTable, String iiOutputTempPath) {
-// // base cuboid job
-// MapReduceExecutable buildIIStep = new MapReduceExecutable();
-//
-// StringBuilder cmd = new StringBuilder();
-// appendMapReduceParameters(cmd, engineConfig);
-//
-// buildIIStep.setName(ExecutableConstants.STEP_NAME_BUILD_II);
-//
-// appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-// appendExecCmdParameters(cmd, "tablename", intermediateHiveTable);
-// appendExecCmdParameters(cmd, "output", iiOutputTempPath);
-// appendExecCmdParameters(cmd, "jobname", ExecutableConstants.STEP_NAME_BUILD_II);
-//
-// buildIIStep.setMapReduceParams(cmd.toString());
-// buildIIStep.setMapReduceJobClass(InvertedIndexJob.class);
-// return buildIIStep;
-// }
-//
-// private HadoopShellExecutable createCreateHTableStep(IISegment seg) {
-// HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
-// createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
-// StringBuilder cmd = new StringBuilder();
-// appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-// appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-//
-// createHtableStep.setJobParams(cmd.toString());
-// createHtableStep.setJobClass(IICreateHTableJob.class);
-//
-// return createHtableStep;
-// }
-//
-// private MapReduceExecutable createConvertToHfileStep(IISegment seg, String inputPath, String jobId) {
-// MapReduceExecutable createHFilesStep = new MapReduceExecutable();
-// createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_II_TO_HFILE);
-// StringBuilder cmd = new StringBuilder();
-//
-// appendMapReduceParameters(cmd, engineConfig);
-// appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-// appendExecCmdParameters(cmd, "input", inputPath);
-// appendExecCmdParameters(cmd, "output", getHFilePath(seg, jobId));
-// appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-// appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getIIInstance().getName() + "_Step");
-//
-// createHFilesStep.setMapReduceParams(cmd.toString());
-// createHFilesStep.setMapReduceJobClass(IICreateHFileJob.class);
-//
-// return createHFilesStep;
-// }
-//
-// private HadoopShellExecutable createBulkLoadStep(IISegment seg, String jobId) {
-// HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
-// bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
-//
-// StringBuilder cmd = new StringBuilder();
-// appendExecCmdParameters(cmd, "input", getHFilePath(seg, jobId));
-// appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-// appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-//
-// bulkLoadStep.setJobParams(cmd.toString());
-// bulkLoadStep.setJobClass(IIBulkLoadJob.class);
-//
-// return bulkLoadStep;
-//
-// }
-//
-// private StringBuilder appendExecCmdParameters(StringBuilder buf, String paraName, String paraValue) {
-// return buf.append(" -").append(paraName).append(" ").append(paraValue);
-// }
-//
-// private String getJobWorkingDir(String uuid) {
-// return engineConfig.getHdfsWorkingDirectory() + "kylin-" + uuid;
-// }
-//
-// private String getIntermediateTableIdentity(IIJoinedFlatTableDesc intermediateTableDesc) {
-// return engineConfig.getConfig().getHiveDatabaseForIntermediateTable() + "." + intermediateTableDesc.getTableName();
-// }
-//}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
index bcae524..5191aca 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
@@ -44,7 +45,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * @author yangli9
*/
public class InvertedIndexJob extends AbstractHadoopJob {
protected static final Logger logger = LoggerFactory.getLogger(InvertedIndexJob.class);
@@ -56,13 +56,11 @@ public class InvertedIndexJob extends AbstractHadoopJob {
try {
options.addOption(OPTION_JOB_NAME);
options.addOption(OPTION_II_NAME);
- options.addOption(OPTION_TABLE_NAME);
options.addOption(OPTION_OUTPUT_PATH);
parseOptions(options, args);
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
String iiname = getOptionValue(OPTION_II_NAME);
- String intermediateTable = getOptionValue(OPTION_TABLE_NAME);
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
// ----------------------------------------------------------------------------
@@ -111,17 +109,11 @@ public class InvertedIndexJob extends AbstractHadoopJob {
private void setupMapper(IISegment segment) throws IOException {
-// String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
-// HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]);
-//
-// job.setInputFormatClass(HCatInputFormat.class);
IMRInput.IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat();
flatTableInputFormat.configureJob(job);
-
job.setMapperClass(InvertedIndexMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
- job.setMapOutputValueClass(ImmutableBytesWritable.class);
job.setPartitionerClass(InvertedIndexPartitioner.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java
index 88249ed..a1251a3 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java
@@ -18,13 +18,11 @@
package org.apache.kylin.engine.mr.invertedindex;
-import java.io.IOException;
-import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.MRUtil;
@@ -33,20 +31,19 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.TableRecord;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import java.io.IOException;
+
/**
* @author yangli9
*/
-public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, Object, LongWritable, ImmutableBytesWritable> {
+public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, Object, LongWritable, Writable> {
private TableRecordInfo info;
- private TableRecord rec;
private LongWritable outputKey;
- private ImmutableBytesWritable outputValue;
private IMRInput.IMRTableInputFormat flatTableInputFormat;
@Override
@@ -60,10 +57,8 @@ public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, Object, LongW
IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
this.info = new TableRecordInfo(seg);
- this.rec = this.info.createTableRecord();
outputKey = new LongWritable();
- outputValue = new ImmutableBytesWritable(rec.getBytes());
flatTableInputFormat = MRUtil.getBatchCubingInputSide(ii.getFirstSegment()).getFlatTableInputFormat();
}
@@ -71,17 +66,12 @@ public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, Object, LongW
@Override
public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
- String[] row = flatTableInputFormat.parseMapperInput(record);
- rec.reset();
- for (int i = 0; i < row.length; i++) {
- Object fieldValue = row[i];
- if (fieldValue != null)
- rec.setValueString(i, fieldValue.toString());
- }
-
- outputKey.set(rec.getTimestamp());
- // outputValue's backing bytes array is the same as rec
+ Writable writableRecord = (Writable) record;
+ String[] row = flatTableInputFormat.parseMapperInput(writableRecord);
+ String timestampString = row[info.getTimestampColumn()];
- context.write(outputKey, outputValue);
+ outputKey.set(DateFormat.stringToMillis(timestampString));
+ //
+ context.write(outputKey, writableRecord);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
index 7644456..56c0b9e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
@@ -18,35 +18,45 @@
package org.apache.kylin.engine.mr.invertedindex;
-import java.io.IOException;
-
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StreamingBatch;
+import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.IncrementalSliceMaker;
import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.SliceBuilder;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
/**
- * @author yangli9
*/
-public class InvertedIndexReducer extends KylinReducer<LongWritable, ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
+public class InvertedIndexReducer extends KylinReducer<LongWritable, Object, ImmutableBytesWritable, ImmutableBytesWritable> {
private TableRecordInfo info;
- private TableRecord rec;
- private IncrementalSliceMaker builder;
private IIKeyValueCodec kv;
+ private IMRInput.IMRTableInputFormat flatTableInputFormat;
+ private SliceBuilder sliceBuilder;
+ private ArrayList<StreamingMessage> messages;
+ private int sliceSize;
+ private ImmutableBytesWritable immutableBytesWritable;
+ private ByteBuffer valueBuf;
@Override
protected void setup(Context context) throws IOException {
@@ -56,44 +66,61 @@ public class InvertedIndexReducer extends KylinReducer<LongWritable, ImmutableBy
KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
IIManager mgr = IIManager.getInstance(config);
IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
- IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
+ IISegment seg = ii.getFirstSegment();
info = new TableRecordInfo(seg);
- rec = info.createTableRecord();
- builder = null;
kv = new IIKeyValueCodec(info.getDigest());
+ flatTableInputFormat = MRUtil.getBatchCubingInputSide(ii.getFirstSegment()).getFlatTableInputFormat();
+ sliceSize = ii.getDescriptor().getSliceSize();
+ short shard = (short) context.getTaskAttemptID().getTaskID().getId();
+ System.out.println("Generating to shard - " + shard);
+ sliceBuilder = new SliceBuilder(seg.getIIDesc(), shard, true);
+ messages = Lists.newArrayListWithCapacity(sliceSize);
+ immutableBytesWritable = new ImmutableBytesWritable();
+ valueBuf = ByteBuffer.allocate(1024 * 1024); // 1MB
}
@Override
- public void reduce(LongWritable key, Iterable<ImmutableBytesWritable> values, Context context) //
+ public void reduce(LongWritable key, Iterable<Object> values, Context context) //
throws IOException, InterruptedException {
- for (ImmutableBytesWritable v : values) {
- rec.setBytes(v.get(), v.getOffset(), v.getLength());
-
- if (builder == null) {
- builder = new IncrementalSliceMaker(info, rec.getShard());
- }
-
- //TODO: to delete this log
- System.out.println(rec.getShard() + " - " + rec);
-
- Slice slice = builder.append(rec);
- if (slice != null) {
- output(slice, context);
+ for (Object v : values) {
+ String[] row = flatTableInputFormat.parseMapperInput(v);
+ messages.add((parse(row)));
+ if (messages.size() >= sliceSize) {
+ buildAndOutput(new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), context);
+ messages = Lists.newArrayList();
}
}
}
+ private StreamingMessage parse(String[] row) {
+ return new StreamingMessage(Lists.newArrayList(row), System.currentTimeMillis(), System.currentTimeMillis(), Collections.<String, Object> emptyMap());
+ }
+
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
- Slice slice = builder.close();
- if (slice != null) {
- output(slice, context);
+ if (!messages.isEmpty()) {
+ buildAndOutput(new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), context);
+ messages.clear();
}
+
}
- private void output(Slice slice, Context context) throws IOException, InterruptedException {
+ private void buildAndOutput(StreamingBatch streamingBatch, Context context) throws IOException, InterruptedException {
+ final Slice slice = sliceBuilder.buildSlice(streamingBatch);
+ ImmutableBytesWritable value, dictionary;
for (IIRow pair : kv.encodeKeyValue(slice)) {
- context.write(pair.getKey(), pair.getValue());
+ value = pair.getValue();
+ dictionary = pair.getDictionary();
+ int newLength = 4 + value.getLength() + dictionary.getLength();
+ if (newLength > valueBuf.limit()) {
+ valueBuf = ByteBuffer.allocate(newLength);
+ }
+ valueBuf.clear();
+ valueBuf.putInt(value.getLength());
+ valueBuf.put(value.get(), value.getOffset(), value.getLength());
+ valueBuf.put(dictionary.get(), dictionary.getOffset(), dictionary.getLength());
+ immutableBytesWritable.set(valueBuf.array(), 0, newLength);
+ context.write(pair.getKey(), immutableBytesWritable);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateIIInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateIIInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateIIInfoAfterBuildStep.java
new file mode 100644
index 0000000..ef9b1c3
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateIIInfoAfterBuildStep.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+import java.io.IOException;
+
+/**
+ */
+public class UpdateIIInfoAfterBuildStep extends AbstractExecutable {
+
+ private static final String II_NAME = "iiName";
+ private static final String JOB_ID = "jobId";
+
+ public UpdateIIInfoAfterBuildStep() {
+ super();
+ }
+
+ public void setInvertedIndexName(String cubeName) {
+ this.setParam(II_NAME, cubeName);
+ }
+
+ private String getInvertedIndexName() {
+ return getParam(II_NAME);
+ }
+
+ public void setJobId(String id) {
+ setParam(JOB_ID, id);
+ }
+
+ private String getJobId() {
+ return getParam(JOB_ID);
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+
+ IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
+ IIInstance ii = mgr.getII(getInvertedIndexName());
+ IISegment segment = ii.getFirstSegment();
+ segment.setStatus(SegmentStatusEnum.READY);
+
+ segment.setLastBuildJobID(getJobId());
+ segment.setLastBuildTime(System.currentTimeMillis());
+
+ try {
+ mgr.updateII(ii);
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ } catch (IOException e) {
+ logger.error("fail to update inverted index after build", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateInvertedIndexInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateInvertedIndexInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateInvertedIndexInfoAfterBuildStep.java
deleted file mode 100644
index 277dea5..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateInvertedIndexInfoAfterBuildStep.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-
-import java.io.IOException;
-
-/**
- */
-public class UpdateInvertedIndexInfoAfterBuildStep extends AbstractExecutable {
-
- private static final String SEGMENT_ID = "segmentId";
- private static final String II_NAME = "iiName";
- private static final String JOB_ID = "jobId";
-
- public UpdateInvertedIndexInfoAfterBuildStep() {
- super();
- }
-
- public void setInvertedIndexName(String cubeName) {
- this.setParam(II_NAME, cubeName);
- }
-
- private String getInvertedIndexName() {
- return getParam(II_NAME);
- }
-
- public void setSegmentId(String segmentId) {
- this.setParam(SEGMENT_ID, segmentId);
- }
-
- private String getSegmentId() {
- return getParam(SEGMENT_ID);
- }
-
- public void setJobId(String id) {
- setParam(JOB_ID, id);
- }
-
- private String getJobId() {
- return getParam(JOB_ID);
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-
- IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
- IIInstance ii = mgr.getII(getInvertedIndexName());
- IISegment segment = ii.getFirstSegment();
- segment.setStatus(SegmentStatusEnum.READY);
-
- segment.setLastBuildJobID(getJobId());
- segment.setLastBuildTime(System.currentTimeMillis());
-
- try {
- mgr.updateII(ii);
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
- } catch (IOException e) {
- logger.error("fail to update inverted index after build", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
index d8cb24b..1cf3d98 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
@@ -33,6 +33,8 @@
*/
package org.apache.kylin.engine.streaming;
+import org.apache.kylin.common.util.StreamingBatch;
+
/**
*/
public interface IStreamingInput {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
index 475e43a..4a3b8b8 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
@@ -36,6 +36,7 @@ package org.apache.kylin.engine.streaming;
import java.util.Map;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.StreamingBatch;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.engine.streaming.util.StreamingUtils;
import org.apache.kylin.metadata.model.IBuildable;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatch.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatch.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatch.java
deleted file mode 100644
index c7c7d29..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatch.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-package org.apache.kylin.engine.streaming;
-
-import java.util.List;
-
-import org.apache.kylin.common.util.Pair;
-
-/**
- */
-public final class StreamingBatch {
-
- private final List<StreamingMessage> messages;
-
- private final Pair<Long, Long> timeRange;
-
- public StreamingBatch(List<StreamingMessage> messages, Pair<Long, Long> timeRange) {
- this.messages = messages;
- this.timeRange = timeRange;
- }
-
- public List<StreamingMessage> getMessages() {
- return messages;
- }
-
- public Pair<Long, Long> getTimeRange() {
- return timeRange;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
index d55ccdb..93cda2d 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
@@ -36,6 +36,7 @@ package org.apache.kylin.engine.streaming;
import java.util.Map;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.StreamingBatch;
import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.metadata.model.IBuildable;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingMessage.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingMessage.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingMessage.java
deleted file mode 100644
index 7885902..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingMessage.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.kylin.engine.streaming;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-/**
- */
-public class StreamingMessage {
-
- private final List<String> data;
-
- private long offset;
-
- private long timestamp;
-
- private Map<String, Object> params;
-
- public static final StreamingMessage EOF = new StreamingMessage(Collections.<String> emptyList(), 0L, 0L, Collections.<String, Object> emptyMap());
-
- public StreamingMessage(List<String> data, long offset, long timestamp, Map<String, Object> params) {
- this.data = data;
- this.offset = offset;
- this.timestamp = timestamp;
- this.params = params;
- }
-
- public final List<String> getData() {
- return data;
- }
-
- public final long getOffset() {
- return offset;
- }
-
- public final long getTimestamp() {
- return timestamp;
- }
-
- public Map<String, Object> getParams() {
- return params;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
index 64a3061..ae72218 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
@@ -54,9 +54,9 @@ import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder;
import org.apache.kylin.cube.util.CubingUtils;
import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.engine.streaming.StreamingBatch;
+import org.apache.kylin.common.util.StreamingBatch;
import org.apache.kylin.engine.streaming.StreamingBatchBuilder;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/invertedindex/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/invertedindex/SliceBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/invertedindex/SliceBuilder.java
deleted file mode 100644
index fa5a0b2..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/invertedindex/SliceBuilder.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.streaming.invertedindex;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.engine.streaming.StreamingBatch;
-import org.apache.kylin.engine.streaming.StreamingMessage;
-import org.apache.kylin.invertedindex.index.BatchSliceMaker;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.util.IIDictionaryBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.util.List;
-
-/**
- */
-public final class SliceBuilder {
-
- private static Logger logger = LoggerFactory.getLogger(SliceBuilder.class);
-
- private final BatchSliceMaker sliceMaker;
- private final IIDesc iiDesc;
- private final boolean useLocalDict;
-
- public SliceBuilder(IIDesc desc, short shard, boolean useLocalDict) {
- this.iiDesc = desc;
- this.sliceMaker = new BatchSliceMaker(desc, shard);
- this.useLocalDict = useLocalDict;
- }
-
- public Slice buildSlice(StreamingBatch microStreamBatch) {
- final List<List<String>> messages = Lists.transform(microStreamBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
- @Nullable
- @Override
- public List<String> apply(@Nullable StreamingMessage input) {
- return input.getData();
- }
- });
- final Dictionary<?>[] dictionaries = useLocalDict ? IIDictionaryBuilder.buildDictionary(messages, iiDesc) : new Dictionary[iiDesc.listAllColumns().size()];
- TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaries);
- return build(messages, tableRecordInfo, dictionaries);
- }
-
- private Slice build(List<List<String>> table, final TableRecordInfo tableRecordInfo, Dictionary<?>[] localDictionary) {
- final Slice slice = sliceMaker.makeSlice(tableRecordInfo.getDigest(), Lists.transform(table, new Function<List<String>, TableRecord>() {
- @Nullable
- @Override
- public TableRecord apply(@Nullable List<String> input) {
- TableRecord result = tableRecordInfo.createTableRecord();
- for (int i = 0; i < input.size(); i++) {
- result.setValueString(i, input.get(i));
- }
- return result;
- }
- }));
- slice.setLocalDictionaries(localDictionary);
- return slice;
- }
-}
[2/3] incubator-kylin git commit: KYLIN-1116 Use local dictionary for
InvertedIndex batch building
Posted by sh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json b/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json
index 237bdd7..4c0c3ef 100644
--- a/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json
+++ b/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json
@@ -4,49 +4,8 @@
"owner": null,
"version": null,
"cost": 10,
- "status": "READY",
- "segments": [
- {
- "uuid": null,
- "name": "19700101000000_20140901000000",
- "status": "READY",
- "dictionaries": {
- "DEFAULT.TEST_KYLIN_FACT/LSTG_SITE_ID": "/dict/EDW.TEST_SITES/SITE_ID/4ef43390-b07e-4d4c-872a-77c0bd783acb.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/SITE_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/SITE_ID/30c9d5f0-abe4-4d1c-a147-610234d90ff1.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL2_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL2_NAME/9355165b-06ad-4c04-977c-a251e66e7e98.dict",
- "DEFAULT.TEST_KYLIN_FACT/LSTG_FORMAT_NAME": "/dict/DEFAULT.TEST_KYLIN_FACT/LSTG_FORMAT_NAME/d5c40465-75e1-40bc-a960-06308f0134a6.dict",
- "DEFAULT.TEST_KYLIN_FACT/SELLER_ID": "/dict/DEFAULT.TEST_KYLIN_FACT/SELLER_ID/fea3a18d-3f20-4b8b-a880-7af93e69241b.dict",
- "EDW.TEST_SITES/CRE_USER": "/dict/EDW.TEST_SITES/CRE_USER/244af7a2-7352-4b30-811f-46e637d7a133.dict",
- "DEFAULT.TEST_KYLIN_FACT/LEAF_CATEG_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID/38361fbc-b875-4273-b8b4-1b8a26ef8570.dict",
- "EDW.TEST_CAL_DT/CAL_DT": "/dict/EDW.TEST_CAL_DT/CAL_DT/5e4b4f35-0fc8-4940-b123-b18c9f77da19.dict",
- "DEFAULT.TEST_KYLIN_FACT/TRANS_ID": "/dict/DEFAULT.TEST_KYLIN_FACT/TRANS_ID/7fb8fed8-7f3b-4089-a85d-3ac07f575c82.dict",
- "EDW.TEST_CAL_DT/WEEK_BEG_DT": "/dict/EDW.TEST_CAL_DT/WEEK_BEG_DT/962b5f64-bee1-49ee-a072-af882193b719.dict",
- "DEFAULT.TEST_KYLIN_FACT/SLR_SEGMENT_CD": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD/bec11fda-9ae0-4668-98ea-f0f4e9dd6993.dict",
- "EDW.TEST_SITES/SITE_NAME": "/dict/EDW.TEST_SITES/SITE_NAME/f363531d-e969-4264-bffd-ac18f8f47220.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_DATE": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_DATE/1fc93a94-1feb-4af4-8078-81a6f1b65e2b.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/META_CATEG_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/META_CATEG_NAME/895739d6-27e1-4ecc-b798-5851c319ea40.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID/38361fbc-b875-4273-b8b4-1b8a26ef8570.dict",
- "EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD/bec11fda-9ae0-4668-98ea-f0f4e9dd6993.dict",
- "EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_DESC": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_DESC/bfb86010-bf4c-4534-a2b0-59d020aed197.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD1": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD1/8ae44fb8-b01a-4db1-a901-dc5f463038cb.dict",
- "EDW.TEST_SITES/SITE_ID": "/dict/EDW.TEST_SITES/SITE_ID/4ef43390-b07e-4d4c-872a-77c0bd783acb.dict",
- "DEFAULT.TEST_KYLIN_FACT/CAL_DT": "/dict/EDW.TEST_CAL_DT/CAL_DT/5e4b4f35-0fc8-4940-b123-b18c9f77da19.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD3": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD3/278d7283-518a-4cd0-b6e1-2573b523bf1f.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_USER": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_USER/bb1bb7a5-b02d-45eb-b3c8-f1a4515264ca.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL3_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL3_NAME/b2d6fae1-eaac-4ac2-8a01-42e5c8b5c198.dict"
- },
- "storage_location_identifier": "test_III",
- "date_range_start": 0,
- "date_range_end": 0,
- "size_kb": 0,
- "input_records": 0,
- "input_records_size": 0,
- "last_build_time": 0,
- "last_build_job_id": null,
- "create_time": null,
- "binary_signature": null
- }
- ],
+ "status": "DISABLED",
+ "segments": [],
"last_modified": 1420016227424,
"descriptor": "test_kylin_ii_inner_join_desc",
"create_time": null,
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json b/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json
index 07c1970..90c21bb 100644
--- a/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json
+++ b/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json
@@ -4,49 +4,8 @@
"owner": null,
"version": null,
"cost": 10,
- "status": "READY",
- "segments": [
- {
- "uuid": null,
- "name": "19700101000000_20140901000000",
- "status": "READY",
- "dictionaries": {
- "DEFAULT.TEST_KYLIN_FACT/LSTG_SITE_ID": "/dict/EDW.TEST_SITES/SITE_ID/4ef43390-b07e-4d4c-872a-77c0bd783acb.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/SITE_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/SITE_ID/30c9d5f0-abe4-4d1c-a147-610234d90ff1.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL2_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL2_NAME/9355165b-06ad-4c04-977c-a251e66e7e98.dict",
- "DEFAULT.TEST_KYLIN_FACT/LSTG_FORMAT_NAME": "/dict/DEFAULT.TEST_KYLIN_FACT/LSTG_FORMAT_NAME/d5c40465-75e1-40bc-a960-06308f0134a6.dict",
- "DEFAULT.TEST_KYLIN_FACT/SELLER_ID": "/dict/DEFAULT.TEST_KYLIN_FACT/SELLER_ID/fea3a18d-3f20-4b8b-a880-7af93e69241b.dict",
- "EDW.TEST_SITES/CRE_USER": "/dict/EDW.TEST_SITES/CRE_USER/244af7a2-7352-4b30-811f-46e637d7a133.dict",
- "DEFAULT.TEST_KYLIN_FACT/LEAF_CATEG_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID/38361fbc-b875-4273-b8b4-1b8a26ef8570.dict",
- "EDW.TEST_CAL_DT/CAL_DT": "/dict/EDW.TEST_CAL_DT/CAL_DT/5e4b4f35-0fc8-4940-b123-b18c9f77da19.dict",
- "DEFAULT.TEST_KYLIN_FACT/TRANS_ID": "/dict/DEFAULT.TEST_KYLIN_FACT/TRANS_ID/7fb8fed8-7f3b-4089-a85d-3ac07f575c82.dict",
- "EDW.TEST_CAL_DT/WEEK_BEG_DT": "/dict/EDW.TEST_CAL_DT/WEEK_BEG_DT/962b5f64-bee1-49ee-a072-af882193b719.dict",
- "DEFAULT.TEST_KYLIN_FACT/SLR_SEGMENT_CD": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD/bec11fda-9ae0-4668-98ea-f0f4e9dd6993.dict",
- "EDW.TEST_SITES/SITE_NAME": "/dict/EDW.TEST_SITES/SITE_NAME/f363531d-e969-4264-bffd-ac18f8f47220.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_DATE": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_DATE/1fc93a94-1feb-4af4-8078-81a6f1b65e2b.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/META_CATEG_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/META_CATEG_NAME/895739d6-27e1-4ecc-b798-5851c319ea40.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID/38361fbc-b875-4273-b8b4-1b8a26ef8570.dict",
- "EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD/bec11fda-9ae0-4668-98ea-f0f4e9dd6993.dict",
- "EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_DESC": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_DESC/bfb86010-bf4c-4534-a2b0-59d020aed197.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD1": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD1/8ae44fb8-b01a-4db1-a901-dc5f463038cb.dict",
- "EDW.TEST_SITES/SITE_ID": "/dict/EDW.TEST_SITES/SITE_ID/4ef43390-b07e-4d4c-872a-77c0bd783acb.dict",
- "DEFAULT.TEST_KYLIN_FACT/CAL_DT": "/dict/EDW.TEST_CAL_DT/CAL_DT/5e4b4f35-0fc8-4940-b123-b18c9f77da19.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD3": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD3/278d7283-518a-4cd0-b6e1-2573b523bf1f.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_USER": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_USER/bb1bb7a5-b02d-45eb-b3c8-f1a4515264ca.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL3_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL3_NAME/b2d6fae1-eaac-4ac2-8a01-42e5c8b5c198.dict"
- },
- "storage_location_identifier": "test_III",
- "date_range_start": 0,
- "date_range_end": 0,
- "size_kb": 0,
- "input_records": 0,
- "input_records_size": 0,
- "last_build_time": 0,
- "last_build_job_id": null,
- "create_time": null,
- "binary_signature": null
- }
- ],
+ "status": "DISABLED",
+ "segments": [],
"last_modified": 1420016227424,
"descriptor": "test_kylin_ii_left_join_desc",
"create_time": null,
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json b/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json
index a703ae4..9abe3ed 100644
--- a/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json
+++ b/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json
@@ -4,26 +4,8 @@
"owner": null,
"version": null,
"cost": 10,
- "status": "READY",
- "segments": [
- {
- "uuid": null,
- "name": "19700101000000_20190901000000",
- "status": "READY",
- "dictionaries": {
- },
- "storage_location_identifier": "KYLIN_2STEAMTEST",
- "date_range_start": 0,
- "date_range_end": 0,
- "size_kb": 0,
- "input_records": 0,
- "input_records_size": 0,
- "last_build_time": 0,
- "last_build_job_id": null,
- "create_time": null,
- "binary_signature": null
- }
- ],
+ "status": "DISABLED",
+ "segments": [],
"last_modified": 0,
"descriptor": "test_streaming_table_ii_desc",
"create_time": null,
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
index 7aff714..5633004 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
@@ -32,13 +32,7 @@ import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.restclient.Broadcaster;
import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.DistinctColumnValuesProvider;
-import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.metadata.realization.IRealization;
@@ -131,45 +125,7 @@ public class IIManager implements IRealizationProvider {
}
return result;
}
-
- public void buildInvertedIndexDictionary(IISegment iiSeg, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
- logger.info("Start building ii dictionary");
- DictionaryManager dictMgr = getDictionaryManager();
- IIDesc iiDesc = iiSeg.getIIInstance().getDescriptor();
- for (TblColRef column : iiDesc.listAllColumns()) {
- logger.info("Dealing with column {}", column);
- if (iiDesc.isMetricsCol(column)) {
- continue;
- }
-
- DictionaryInfo dict = dictMgr.buildDictionary(iiDesc.getModel(), "true", column, factTableValueProvider);
- iiSeg.putDictResPath(column, dict.getResourcePath());
- }
- updateII(iiSeg.getIIInstance());
- }
-
- /**
- * return null if no dictionary for given column
- */
- public Dictionary<?> getDictionary(IISegment iiSeg, TblColRef col) {
- DictionaryInfo info = null;
- try {
- DictionaryManager dictMgr = getDictionaryManager();
- // logger.info("Using metadata url " + metadataUrl +
- // " for DictionaryManager");
- String dictResPath = iiSeg.getDictResPath(col);
- if (dictResPath == null)
- return null;
-
- info = dictMgr.getDictionaryInfo(dictResPath);
- if (info == null)
- throw new IllegalStateException("No dictionary found by " + dictResPath + ", invalid II state; II segment" + iiSeg + ", col " + col);
- } catch (IOException e) {
- throw new IllegalStateException("Failed to get dictionary for II segment" + iiSeg + ", col" + col, e);
- }
-
- return info.getDictionaryObject();
- }
+
public IIInstance createII(IIInstance ii) throws IOException {
@@ -300,10 +256,6 @@ public class IIManager implements IRealizationProvider {
}
}
- private DictionaryManager getDictionaryManager() {
- return DictionaryManager.getInstance(config);
- }
-
private ResourceStore getStore() {
return ResourceStore.getStore(this.config);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
index adcca8b..c3ca464 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
@@ -19,18 +19,14 @@
package org.apache.kylin.invertedindex;
import java.text.SimpleDateFormat;
-import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.TimeZone;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.IDictionaryAware;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
@@ -49,7 +45,7 @@ import org.apache.kylin.metadata.realization.IRealizationSegment;
// TODO: remove segment concept for II, append old hbase table
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class IISegment implements Comparable<IISegment>, IDictionaryAware, IRealizationSegment {
+public class IISegment implements Comparable<IISegment>, IRealizationSegment {
@JsonBackReference
private IIInstance iiInstance;
@@ -83,11 +79,6 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware, IReal
private String binarySignature; // a hash of schema and dictionary ID,
// used for sanity check
- @JsonProperty("dictionaries")
- private ConcurrentHashMap<String, String> dictionaries; // table/column ==>
- // dictionary
- // resource path
-
private transient TableRecordInfo tableRecordInfo;
/**
@@ -216,28 +207,6 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware, IReal
return storageLocationIdentifier;
}
- public Map<String, String> getDictionaries() {
- if (dictionaries == null)
- dictionaries = new ConcurrentHashMap<String, String>();
- return dictionaries;
- }
-
- public Collection<String> getDictionaryPaths() {
- return getDictionaries().values();
- }
-
- public String getDictResPath(TblColRef col) {
- return getDictionaries().get(dictKey(col));
- }
-
- public void putDictResPath(TblColRef col, String dictResPath) {
- getDictionaries().put(dictKey(col), dictResPath);
- }
-
- private String dictKey(TblColRef col) {
- return col.getTable() + "/" + col.getName();
- }
-
/**
* @param storageLocationIdentifier the storageLocationIdentifier to set
*/
@@ -262,10 +231,6 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware, IReal
return tableRecordInfo;
}
- // public void updateDictionary(List<Dictionary<?>> dicts) {
- // getTableRecordInfo().updateDictionary( dicts);
- // }
-
public List<TblColRef> getColumns() {
return this.getTableRecordInfo().getColumns();
}
@@ -275,20 +240,6 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware, IReal
return Objects.toStringHelper(this).add("uuid", uuid).add("create_time_utc:", createTimeUTC).add("name", name).add("last_build_job_id", lastBuildJobID).add("status", status).toString();
}
- @Override
- public int getColumnLength(TblColRef col) {
-
- int index = getTableRecordInfo().findColumn(col);
- return getTableRecordInfo().getDigest().length(index);
- }
-
- @Override
- public Dictionary<?> getDictionary(TblColRef col) {
-
- int index = getTableRecordInfo().findColumn(col);
- return getTableRecordInfo().dict(index);
- }
-
public long getCreateTimeUTC() {
return createTimeUTC;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
new file mode 100644
index 0000000..1c293d7
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.invertedindex.index;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.StreamingBatch;
+import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.util.IIDictionaryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ */
+public final class SliceBuilder {
+
+ private static Logger logger = LoggerFactory.getLogger(SliceBuilder.class);
+
+ private final BatchSliceMaker sliceMaker;
+ private final IIDesc iiDesc;
+ private final boolean useLocalDict;
+
+ public SliceBuilder(IIDesc desc, short shard, boolean useLocalDict) {
+ this.iiDesc = desc;
+ this.sliceMaker = new BatchSliceMaker(desc, shard);
+ this.useLocalDict = useLocalDict;
+ }
+
+ public Slice buildSlice(StreamingBatch microStreamBatch) {
+ final List<List<String>> messages = Lists.transform(microStreamBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
+ @Nullable
+ @Override
+ public List<String> apply(@Nullable StreamingMessage input) {
+ return input.getData();
+ }
+ });
+ final Dictionary<?>[] dictionaries = useLocalDict ? IIDictionaryBuilder.buildDictionary(messages, iiDesc) : new Dictionary[iiDesc.listAllColumns().size()];
+ TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaries);
+ return build(messages, tableRecordInfo, dictionaries);
+ }
+
+ private Slice build(List<List<String>> table, final TableRecordInfo tableRecordInfo, Dictionary<?>[] localDictionary) {
+ final Slice slice = sliceMaker.makeSlice(tableRecordInfo.getDigest(), Lists.transform(table, new Function<List<String>, TableRecord>() {
+ @Nullable
+ @Override
+ public TableRecord apply(@Nullable List<String> input) {
+ TableRecord result = tableRecordInfo.createTableRecord();
+ for (int i = 0; i < input.size(); i++) {
+ result.setValueString(i, input.get(i));
+ }
+ return result;
+ }
+ }));
+ slice.setLocalDictionaries(localDictionary);
+ return slice;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
index baafacd..8a0c2ba 100644
--- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
+++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
@@ -53,11 +53,6 @@ public class IIInstanceTest extends LocalFileMetadataTestCase {
Assert.assertTrue(iiInstances.size() > 0);
- IIInstance instance = iiInstances.get(0);
-
- Dictionary dict = mgr.getDictionary(instance.getFirstSegment(), instance.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "LSTG_SITE_ID"));
-
- Assert.assertNotNull(dict);
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 1eb2683..b8d1333 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
@@ -78,6 +79,8 @@ public class HiveMRInput implements IMRInput {
try {
HCatInputFormat.setInput(job, dbName, tableName);
job.setInputFormatClass(HCatInputFormat.class);
+
+ job.setMapOutputValueClass(org.apache.hive.hcatalog.data.DefaultHCatRecord.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index a1ab712..ee5a555 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -48,8 +48,8 @@ import kafka.message.MessageAndOffset;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.engine.streaming.IStreamingInput;
-import org.apache.kylin.engine.streaming.StreamingBatch;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingBatch;
+import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.kylin.source.kafka.util.KafkaRequester;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index aace8bc..3455f1d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -47,7 +47,7 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.engine.streaming.StreamingManager;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.metadata.model.IntermediateColumnDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.kafka.config.KafkaConfig;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
index c0e506f..9691ea7 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -40,7 +40,7 @@ import java.util.List;
import kafka.message.MessageAndOffset;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.metadata.model.TblColRef;
import com.google.common.collect.Lists;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 4fae228..00f93a5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -43,7 +43,7 @@ import kafka.message.MessageAndOffset;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.TimeUtil;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
index de5e58e..0e29a0c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
@@ -40,7 +40,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.DaemonThreadFactory;
import org.apache.kylin.common.util.OptionsHelper;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.StreamingParser;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
index 2833ea4..96b7fa7 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
@@ -11,7 +11,7 @@ import kafka.javaapi.PartitionMetadata;
import kafka.message.MessageAndOffset;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.source.kafka.StreamingParser;
import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
index e4b688f..0e0a8ce 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
@@ -19,6 +19,7 @@
package org.apache.kylin.storage.hbase.ii;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
@@ -43,13 +44,29 @@ public class IICreateHFileMapper extends KylinMapper<ImmutableBytesWritable, Imm
@Override
protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value, Context context) throws IOException, InterruptedException {
+ ByteBuffer buffer = ByteBuffer.wrap(value.get(), value.getOffset(), value.getLength());
+ int totalLength = value.getLength();
+ int valueLength = buffer.getInt();
+ int dictionaryLength = totalLength - valueLength - 4;
KeyValue kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), //
IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, //
IIDesc.HBASE_QUALIFIER_BYTES, 0, IIDesc.HBASE_QUALIFIER_BYTES.length, //
timestamp, Type.Put, //
- value.get(), value.getOffset(), value.getLength());
+ buffer.array(), buffer.position(), valueLength);
+ // write value
context.write(key, kv);
+
+ kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), //
+ IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, //
+ IIDesc.HBASE_DICTIONARY_BYTES, 0, IIDesc.HBASE_DICTIONARY_BYTES.length, //
+ timestamp, Type.Put, //
+ buffer.array(), buffer.position() + valueLength, dictionaryLength);
+
+
+ // write dictionary
+ context.write(key, kv);
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
index 0a72a91..bcfe346 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
@@ -47,7 +47,7 @@ public class IICreateHTableJob extends AbstractHadoopJob {
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
-
+ HBaseAdmin admin = null;
try {
options.addOption(OPTION_II_NAME);
options.addOption(OPTION_HTABLE_NAME);
@@ -61,6 +61,22 @@ public class IICreateHTableJob extends AbstractHadoopJob {
IIInstance ii = iiManager.getII(iiName);
int sharding = ii.getDescriptor().getSharding();
+
+ Configuration conf = HBaseConfiguration.create(getConf());
+ // check if the table already exists
+ admin = new HBaseAdmin(conf);
+ if (admin.tableExists(tableName)) {
+ if (admin.isTableEnabled(tableName)) {
+ logger.info("Table " + tableName + " already exists and is enabled, no need to create.");
+ return 0;
+ } else {
+ logger.error("Table " + tableName + " is disabled, couldn't append data");
+ return 1;
+ }
+ }
+
+ // table doesn't exist, need to create
+
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor cf = new HColumnDescriptor(IIDesc.HBASE_FAMILY);
cf.setMaxVersions(1);
@@ -100,7 +116,6 @@ public class IICreateHTableJob extends AbstractHadoopJob {
tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
- Configuration conf = HBaseConfiguration.create(getConf());
if (User.isHBaseSecurityEnabled(conf)) {
// add coprocessor for bulk load
tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
@@ -108,13 +123,7 @@ public class IICreateHTableJob extends AbstractHadoopJob {
IIDeployCoprocessorCLI.deployCoprocessor(tableDesc);
- // drop the table first
- HBaseAdmin admin = new HBaseAdmin(conf);
- if (admin.tableExists(tableName)) {
- admin.disableTable(tableName);
- admin.deleteTable(tableName);
- }
-
+
// create table
byte[][] splitKeys = getSplits(sharding);
if (splitKeys.length == 0)
@@ -126,12 +135,14 @@ public class IICreateHTableJob extends AbstractHadoopJob {
}
}
System.out.println("create hbase table " + tableName + " done.");
- admin.close();
return 0;
} catch (Exception e) {
printUsage(options);
throw e;
+ } finally {
+ if (admin != null)
+ admin.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
index ff8b659..11c1711 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
@@ -54,7 +54,7 @@ public class HBaseMROutput implements IMROutput {
@Override
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
- steps.addCubingGarbageCollectionSteps(jobFlow);
+ steps.addInvertedIndexGarbageCollectionSteps(jobFlow);
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 1267d2d..03db6b0 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -184,7 +184,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
}
public void addSaveIIToHTableSteps(DefaultChainedExecutable jobFlow, String rootPath) {
- // create htable step
+ // create htable if it doesn't exist
jobFlow.addTask(createCreateIIHTableStep(seg));
final String iiPath = rootPath + "*";
@@ -198,6 +198,22 @@ public class HBaseMRSteps extends JobBuilderSupport {
}
+ public void addInvertedIndexGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
+ String jobId = jobFlow.getId();
+
+ List<String> toDeletePaths = new ArrayList<>();
+ toDeletePaths.add(getJobWorkingDir(jobId));
+
+ HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
+ step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+ step.setDeletePaths(toDeletePaths);
+ step.setJobId(jobId);
+
+ jobFlow.addTask(step);
+ }
+
+
+
private HadoopShellExecutable createCreateIIHTableStep(IRealizationSegment seg) {
HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index b200c2e..15dc993 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -46,7 +46,6 @@ import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
@@ -108,7 +107,7 @@ public class DeployCoprocessorCLI {
private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
+ Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
FileSystem fileSystem = FileSystem.get(hconf);
String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index 88cb7de..2137f57 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -35,8 +35,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java
index 335e00c..6c6ed80 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java
@@ -6,6 +6,7 @@ import java.util.Arrays;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
@@ -40,7 +41,10 @@ public class TsConditionEraserTest extends LocalFileMetadataTestCase {
@Before
public void setup() throws IOException {
this.createTestMetadata();
- this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+ IIManager iiManager = IIManager.getInstance(getTestConfig());
+ this.ii = iiManager.getII("test_kylin_ii_left_join");
+ IISegment segment = iiManager.buildSegment(ii, 0, System.currentTimeMillis());
+ ii.getSegments().add(segment);
this.tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
this.factTableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc("DEFAULT.TEST_KYLIN_FACT");
this.caldt = this.ii.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "CAL_DT");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java
index e271129..8b56605 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java
@@ -18,20 +18,11 @@
package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
+import com.google.common.collect.Lists;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.metadata.measure.LongMutable;
import org.apache.kylin.metadata.measure.MeasureAggregator;
@@ -45,7 +36,11 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.*;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
/**
*
@@ -90,6 +85,10 @@ public class EndpointAggregationTest extends LocalFileMetadataTestCase {
@Test
public void testSerializeAggregator() {
final IIInstance ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+ if (ii.getFirstSegment() == null) {
+ IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis());
+ ii.getSegments().add(segment);
+ }
final TableRecordInfo tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
final EndpointAggregators endpointAggregators = EndpointAggregators.fromFunctions(tableRecordInfo, buildAggregations());
byte[] x = EndpointAggregators.serialize(endpointAggregators);
@@ -139,6 +138,10 @@ public class EndpointAggregationTest extends LocalFileMetadataTestCase {
@Test
public void basicTest() {
final IIInstance ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+ if (ii.getFirstSegment() == null) {
+ IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis());
+ ii.getSegments().add(segment);
+ }
final TableRecordInfo tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
final EndpointAggregators aggregators = EndpointAggregators.fromFunctions(tableRecordInfo, buildAggregations());
final EndpointAggregationCache aggCache = new EndpointAggregationCache(aggregators);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java
index 791002f..3e34495 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
import org.junit.After;
@@ -41,6 +42,10 @@ public class TableRecordInfoTest extends LocalFileMetadataTestCase {
public void setup() throws IOException {
this.createTestMetadata();
this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+ if (ii.getFirstSegment() == null) {
+ IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis());
+ ii.getSegments().add(segment);
+ }
this.tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java
index 412e335..4e5a9d9 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
@@ -58,6 +59,10 @@ public class TsConditionExtractorTest extends LocalFileMetadataTestCase {
public void setup() throws IOException {
this.createTestMetadata();
this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+ if (ii.getFirstSegment() == null) {
+ IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis());
+ ii.getSegments().add(segment);
+ }
this.tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
this.factTableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc("DEFAULT.TEST_KYLIN_FACT");
this.calDt = this.ii.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "CAL_DT");