You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/04/11 00:48:04 UTC
[27/34] incubator-kylin git commit: fix ci
fix ci
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/7eebef63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/7eebef63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/7eebef63
Branch: refs/heads/streaming-localdict
Commit: 7eebef63e346dffcd2c3c83b502a9b735789a557
Parents: 6406540
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Apr 9 11:13:41 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu Apr 9 17:04:29 2015 +0800
----------------------------------------------------------------------
.../invertedindex/index/TableRecordInfo.java | 2 +-
.../invertedindex/util/IIDictionaryBuilder.java | 82 +++++++++
.../endpoint/EndpointAggregators.java | 55 ++++---
.../endpoint/EndpointAggregationTest.java | 165 ++++++++-----------
.../streaming/invertedindex/SliceBuilder.java | 46 +-----
5 files changed, 183 insertions(+), 167 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7eebef63/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
index 0955a5f..606e00e 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
@@ -48,7 +48,7 @@ public class TableRecordInfo {
}
public TableRecordInfo(IIDesc desc) {
- this(desc, new Dictionary<?>[desc.listAllColumns().size()]);
+ this(desc, new Dictionary<?>[0]);
}
public TableRecordInfo(IIDesc desc, Dictionary<?>[] dictionaryMap) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7eebef63/invertedindex/src/main/java/org/apache/kylin/invertedindex/util/IIDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/util/IIDictionaryBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/util/IIDictionaryBuilder.java
new file mode 100644
index 0000000..45e8058
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/util/IIDictionaryBuilder.java
@@ -0,0 +1,82 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.invertedindex.util;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Created by qianzhou on 4/9/15.
+ */
+public final class IIDictionaryBuilder {
+
+ private IIDictionaryBuilder(){}
+
+ public static Dictionary<?>[] buildDictionary(List<List<String>> table, IIDesc desc) {
+ HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
+ final List<TblColRef> allColumns = desc.listAllColumns();
+ for (List<String> row : table) {
+ for (int i = 0; i < row.size(); i++) {
+ String cell = row.get(i);
+ if (!desc.isMetricsCol(i)) {
+ valueMap.put(allColumns.get(i), cell);
+ }
+ }
+ }
+
+ Dictionary<?>[] result = new Dictionary<?>[allColumns.size()];
+ for (TblColRef tblColRef : valueMap.keySet()) {
+ final Collection<byte[]> bytes = Collections2.transform(valueMap.get(tblColRef), new Function<String, byte[]>() {
+ @Nullable
+ @Override
+ public byte[] apply(String input) {
+ return input == null ? null : input.getBytes();
+ }
+ });
+ final Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), bytes);
+ result[desc.findColumn(tblColRef)] = dict;
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7eebef63/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
index 5af5f44..19d8fd1 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
@@ -21,6 +21,7 @@ package org.apache.kylin.storage.hbase.coprocessor.endpoint;
import java.nio.ByteBuffer;
import java.util.List;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
@@ -47,7 +48,7 @@ public class EndpointAggregators {
Count, DimensionAsMetric, DistinctCount, Normal
}
- private static class MetricInfo {
+ private final static class MetricInfo {
private MetricType type;
private int refIndex = -1;
private int precision = -1;
@@ -69,34 +70,36 @@ public class EndpointAggregators {
}
+ private static MetricInfo generateMetricInfo(int index, FunctionDesc functionDesc) {
+ if (functionDesc.isCount()) {
+ return new MetricInfo(MetricType.Count);
+ } else if (functionDesc.isDimensionAsMetric()) {
+ return new MetricInfo(MetricType.DimensionAsMetric);
+ } else {
+ Preconditions.checkState(index >= 0, "Column " + functionDesc.getParameter().getValue() + " is not found in II");
+ if (functionDesc.isCountDistinct()) {
+ return new MetricInfo(MetricType.DistinctCount, index, functionDesc.getReturnDataType().getPrecision());
+ } else {
+ return new MetricInfo(MetricType.Normal, index);
+ }
+ }
+ }
+
+
public static EndpointAggregators fromFunctions(TableRecordInfo tableInfo, List<FunctionDesc> metrics) {
- String[] funcNames = new String[metrics.size()];
- String[] dataTypes = new String[metrics.size()];
- MetricInfo[] metricInfos = new MetricInfo[metrics.size()];
+ final int metricSize = metrics.size();
+ String[] funcNames = new String[metricSize];
+ String[] dataTypes = new String[metricSize];
+ MetricInfo[] metricInfos = new MetricInfo[metricSize];
- for (int i = 0; i < metrics.size(); i++) {
+ for (int i = 0; i < metricSize; i++) {
FunctionDesc functionDesc = metrics.get(i);
//TODO: what if funcionDesc's type is different from tablDesc? cause scale difference
funcNames[i] = functionDesc.getExpression();
dataTypes[i] = functionDesc.getReturnType();
-
- if (functionDesc.isCount()) {
- metricInfos[i] = new MetricInfo(MetricType.Count);
- } else if (functionDesc.isDimensionAsMetric()) {
- metricInfos[i] = new MetricInfo(MetricType.DimensionAsMetric);
- } else {
- int index = tableInfo.findFactTableColumn(functionDesc.getParameter().getValue());
- if (index < 0) {
- throw new IllegalStateException("Column " + functionDesc.getParameter().getValue() + " is not found in II");
- }
-
- if (functionDesc.isCountDistinct()) {
- metricInfos[i] = new MetricInfo(MetricType.DistinctCount, index, functionDesc.getReturnDataType().getPrecision());
- } else {
- metricInfos[i] = new MetricInfo(MetricType.Normal, index);
- }
- }
+ int index = tableInfo.findFactTableColumn(functionDesc.getParameter().getValue());
+ metricInfos[i] = generateMetricInfo(index, functionDesc);
}
return new EndpointAggregators(funcNames, dataTypes, metricInfos, tableInfo.getDigest());
@@ -115,7 +118,7 @@ public class EndpointAggregators {
final LongWritable ONE = new LongWritable(1);
- public EndpointAggregators(String[] funcNames, String[] dataTypes, MetricInfo[] metricInfos, TableRecordInfoDigest tableInfo) {
+ private EndpointAggregators(String[] funcNames, String[] dataTypes, MetricInfo[] metricInfos, TableRecordInfoDigest tableInfo) {
this.funcNames = funcNames;
this.dataTypes = dataTypes;
this.metricInfos = metricInfos;
@@ -161,16 +164,16 @@ public class EndpointAggregators {
rawTableRecord.setBytes(row, 0, row.length);
for (int metricIndex = 0; metricIndex < metricInfos.length; ++metricIndex) {
- if (metricInfos[metricIndex].type == MetricType.Count) {
+ final MetricInfo metricInfo = metricInfos[metricIndex];
+ if (metricInfo.type == MetricType.Count) {
measureAggrs[metricIndex].aggregate(ONE);
continue;
}
- if (metricInfos[metricIndex].type == MetricType.DimensionAsMetric) {
+ if (metricInfo.type == MetricType.DimensionAsMetric) {
continue;
}
- MetricInfo metricInfo = metricInfos[metricIndex];
MeasureAggregator aggregator = measureAggrs[metricIndex];
FixedLenMeasureCodec measureSerializer = measureSerializers[metricIndex];
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7eebef63/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
index e90f5b5..7cb0d09 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
@@ -18,24 +18,18 @@
package org.apache.kylin.storage.hbase.coprocessor.endpoint;
-import static org.junit.Assert.assertEquals;
-
+import com.google.common.collect.Lists;
+import org.apache.hadoop.io.LongWritable;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.index.TableRecord;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.measure.MeasureAggregator;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.*;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
+import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.kylin.storage.hbase.coprocessor.FilterDecorator;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
@@ -44,6 +38,9 @@ import org.junit.Test;
import java.io.IOException;
import java.util.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
/**
* Created by Hongbin Ma(Binmahone) on 11/27/14.
*
@@ -51,39 +48,10 @@ import java.util.*;
*/
@Ignore("need to mock up TableRecordInfo")
public class EndpointAggregationTest extends LocalFileMetadataTestCase {
- IIInstance ii;
- TableRecordInfo tableRecordInfo;
-
- CoprocessorProjector projector;
- EndpointAggregators aggregators;
- CoprocessorFilter filter;
-
- EndpointAggregationCache aggCache;
- List<TableRecord> tableData;
-
- TableDesc factTableDesc;
@Before
public void setup() throws IOException {
this.createTestMetadata();
- this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
- this.tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
- factTableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc("DEFAULT.TEST_KYLIN_FACT");
- TblColRef formatName = this.ii.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "LSTG_FORMAT_NAME");
- TblColRef siteId = this.ii.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "LSTG_SITE_ID");
-
- Collection<TblColRef> dims = new HashSet<>();
- dims.add(formatName);
- projector = CoprocessorProjector.makeForEndpoint(tableRecordInfo, dims);
- aggregators = EndpointAggregators.fromFunctions(tableRecordInfo, buildAggregations());
-
- CompareTupleFilter rawFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
- rawFilter.addChild(new ColumnTupleFilter(siteId));
- rawFilter.addChild(new ConstantTupleFilter("0"));
- filter = CoprocessorFilter.fromFilter(this.ii.getFirstSegment(), rawFilter, FilterDecorator.FilterConstantsTreatment.AS_IT_IS);
-
- aggCache = new EndpointAggregationCache(aggregators);
- tableData = mockTable();
}
@After
@@ -91,54 +59,6 @@ public class EndpointAggregationTest extends LocalFileMetadataTestCase {
cleanupTestMetadata();
}
- private List<TableRecord> mockTable() {
-
- ColumnDesc[] factTableColumns = factTableDesc.getColumns();
- int[] factTableColumnIndex = new int[factTableColumns.length];
- for (int i = 0; i < factTableColumnIndex.length; ++i) {
- factTableColumnIndex[i] = tableRecordInfo.findColumn(new TblColRef(factTableColumns[i]));
- }
-
- TableRecord temp1 = tableRecordInfo.createTableRecord();
- temp1.setValueString(factTableColumnIndex[0], "10000000239");
- temp1.setValueString(factTableColumnIndex[1], "2012-03-22");
- temp1.setValueString(factTableColumnIndex[2], "Auction");
- temp1.setValueString(factTableColumnIndex[3], "80135");
- temp1.setValueString(factTableColumnIndex[4], "0");
- temp1.setValueString(factTableColumnIndex[5], "14");
- temp1.setValueString(factTableColumnIndex[6], "199.99");
- temp1.setValueString(factTableColumnIndex[7], "1");
- temp1.setValueString(factTableColumnIndex[8], "10000005");
-
- TableRecord temp2 = tableRecordInfo.createTableRecord();
- temp2.setValueString(factTableColumnIndex[0], "10000000244");
- temp2.setValueString(factTableColumnIndex[1], "2012-11-11");
- temp2.setValueString(factTableColumnIndex[2], "Auction");
- temp2.setValueString(factTableColumnIndex[3], "16509");
- temp2.setValueString(factTableColumnIndex[4], "101");
- temp2.setValueString(factTableColumnIndex[5], "12");
- temp2.setValueString(factTableColumnIndex[6], "2.09");
- temp2.setValueString(factTableColumnIndex[7], "1");
- temp2.setValueString(factTableColumnIndex[8], "10000004");
-
- TableRecord temp3 = tableRecordInfo.createTableRecord();
- temp3.setValueString(factTableColumnIndex[0], "10000000259");
- temp3.setValueString(factTableColumnIndex[1], "2012-07-12");
- temp3.setValueString(factTableColumnIndex[2], "Others");
- temp3.setValueString(factTableColumnIndex[3], "15687");
- temp3.setValueString(factTableColumnIndex[4], "0");
- temp3.setValueString(factTableColumnIndex[5], "14");
- temp3.setValueString(factTableColumnIndex[6], "100");
- temp3.setValueString(factTableColumnIndex[7], "1");
- temp3.setValueString(factTableColumnIndex[8], "10000020");
-
- List<TableRecord> ret = new ArrayList<TableRecord>();
- ret.add(temp1);
- ret.add(temp2);
- ret.add(temp3);
- return ret;
- }
-
private List<FunctionDesc> buildAggregations() {
List<FunctionDesc> functions = new ArrayList<FunctionDesc>();
@@ -164,26 +84,72 @@ public class EndpointAggregationTest extends LocalFileMetadataTestCase {
}
@Test
- public void testSerializeAggreagtor() {
- EndpointAggregators endpointAggregators = EndpointAggregators.fromFunctions(tableRecordInfo, buildAggregations());
+ public void testSerializeAggregator() {
+ final IIInstance ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+ final TableRecordInfo tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
+ final EndpointAggregators endpointAggregators = EndpointAggregators.fromFunctions(tableRecordInfo, buildAggregations());
byte[] x = EndpointAggregators.serialize(endpointAggregators);
- EndpointAggregators.deserialize(x);
+ final EndpointAggregators result = EndpointAggregators.deserialize(x);
+ assertArrayEquals(endpointAggregators.dataTypes, result.dataTypes);
+ assertArrayEquals(endpointAggregators.funcNames, result.funcNames);
+ assertArrayEquals(endpointAggregators.metricValues, result.metricValues);
+ assertEquals(endpointAggregators.rawTableRecord.getBytes().length, result.rawTableRecord.getBytes().length);
+ }
+
+ private byte[] randomBytes(final int length) {
+ byte[] result = new byte[length];
+ Random random = new Random();
+ for (int i = 0; i < length; i++) {
+ random.nextBytes(result);
+ }
+ return result;
+ }
+
+ private List<byte[]> mockData(TableRecordInfo tableRecordInfo) {
+ ArrayList<byte[]> result = Lists.newArrayList();
+ final int priceColumnIndex = 23;
+ final int groupByColumnIndex = 0;
+ TblColRef column = tableRecordInfo.getDescriptor().listAllColumns().get(priceColumnIndex);
+ FixedLenMeasureCodec codec = FixedLenMeasureCodec.get(column.getType());
+
+ byte[] data = randomBytes(tableRecordInfo.getDigest().getByteFormLen());
+ byte[] groupOne = randomBytes(tableRecordInfo.getDigest().length(groupByColumnIndex));
+ codec.write(codec.valueOf("199.99"), data, tableRecordInfo.getDigest().offset(priceColumnIndex));
+ System.arraycopy(groupOne, 0, data, tableRecordInfo.getDigest().offset(groupByColumnIndex), groupOne.length);
+ result.add(data);
+
+ data = randomBytes(tableRecordInfo.getDigest().getByteFormLen());;
+ codec.write(codec.valueOf("2.09"), data, tableRecordInfo.getDigest().offset(priceColumnIndex));
+ System.arraycopy(groupOne, 0, data, tableRecordInfo.getDigest().offset(groupByColumnIndex), groupOne.length);
+ result.add(data);
+
+ byte[] groupTwo = randomBytes(tableRecordInfo.getDigest().length(groupByColumnIndex));
+ data = randomBytes(tableRecordInfo.getDigest().getByteFormLen());
+ System.arraycopy(groupTwo, 0, data, tableRecordInfo.getDigest().offset(groupByColumnIndex), groupTwo.length);
+ codec.write(codec.valueOf("100"), data, tableRecordInfo.getDigest().offset(priceColumnIndex));
+ result.add(data);
+
+ return result;
}
@Test
- @SuppressWarnings("rawtypes")
public void basicTest() {
-
- for (int i = 0; i < tableData.size(); ++i) {
- byte[] data = tableData.get(i).getBytes();
+ final IIInstance ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+ final TableRecordInfo tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
+ final EndpointAggregators aggregators = EndpointAggregators.fromFunctions(tableRecordInfo, buildAggregations());
+ final EndpointAggregationCache aggCache = new EndpointAggregationCache(aggregators);
+ final Collection<TblColRef> dims = new HashSet<>();
+ final TblColRef groupByColumn = ii.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "LSTG_FORMAT_NAME");
+ dims.add(groupByColumn);
+ CoprocessorProjector projector = CoprocessorProjector.makeForEndpoint(tableRecordInfo, dims);
+ List<byte[]> rawData = mockData(tableRecordInfo);
+ for (int i = 0; i < rawData.size(); ++i) {
+ byte[] data = rawData.get(i);
CoprocessorProjector.AggrKey aggKey = projector.getAggrKey(data);
MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
aggregators.aggregate(bufs, data);
aggCache.checkMemoryUsage();
}
-
- assertEquals(aggCache.getAllEntries().size(), 2);
-
long sumTotal = 0;
long minTotal = 0;
for (Map.Entry<CoprocessorProjector.AggrKey, MeasureAggregator[]> entry : aggCache.getAllEntries()) {
@@ -193,7 +159,6 @@ public class EndpointAggregationTest extends LocalFileMetadataTestCase {
}
assertEquals(3020800, sumTotal);
assertEquals(1020900, minTotal);
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7eebef63/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
index 22bfa42..6246215 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
@@ -34,28 +34,22 @@
package org.apache.kylin.streaming.invertedindex;
-import java.util.Collection;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.DictionaryGenerator;
import org.apache.kylin.invertedindex.index.BatchSliceMaker;
import org.apache.kylin.invertedindex.index.Slice;
import org.apache.kylin.invertedindex.index.TableRecord;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.invertedindex.util.IIDictionaryBuilder;
import org.apache.kylin.streaming.Stream;
import org.apache.kylin.streaming.StreamParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
+import javax.annotation.Nullable;
+import java.util.List;
/**
* Created by qianzhou on 3/27/15.
@@ -81,39 +75,11 @@ public final class SliceBuilder {
return streamParser.parse(input);
}
});
- final Dictionary<?>[] dictionaryMap = buildDictionary(table, iiDesc);
+ final Dictionary<?>[] dictionaryMap = IIDictionaryBuilder.buildDictionary(table, iiDesc);
TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaryMap);
return build(table, tableRecordInfo, dictionaryMap);
}
- private Dictionary<?>[] buildDictionary(List<List<String>> table, IIDesc desc) {
- HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
- final List<TblColRef> allColumns = desc.listAllColumns();
- for (List<String> row : table) {
- for (int i = 0; i < row.size(); i++) {
- String cell = row.get(i);
- if (!desc.isMetricsCol(i)) {
- valueMap.put(allColumns.get(i), cell);
- }
- }
- }
-
- Dictionary<?>[] result = new Dictionary<?>[allColumns.size()];
- for (TblColRef tblColRef : valueMap.keySet()) {
- final Collection<byte[]> bytes = Collections2.transform(valueMap.get(tblColRef), new Function<String, byte[]>() {
- @Nullable
- @Override
- public byte[] apply(String input) {
- return input == null ? null : input.getBytes();
- }
- });
- logger.info("build dictionary for column " + tblColRef);
- final Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), bytes);
- result[desc.findColumn(tblColRef)] = dict;
- }
- return result;
- }
-
private Slice build(List<List<String>> table, final TableRecordInfo tableRecordInfo, Dictionary<?>[] localDictionary) {
final Slice slice = sliceMaker.makeSlice(tableRecordInfo.getDigest(), Lists.transform(table, new Function<List<String>, TableRecord>() {
@Nullable