You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/11/30 07:32:07 UTC
[3/5] kylin git commit: KYLIN-2217 Reducers build dictionaries locally
KYLIN-2217 Reducers build dictionaries locally
Signed-off-by: Li Yang <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1af08e4b
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1af08e4b
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1af08e4b
Branch: refs/heads/KYLIN-2217-2
Commit: 1af08e4b8875d33bfc5dd124fed72d6042456c32
Parents: b1b90ad
Author: xiefan46 <95...@qq.com>
Authored: Wed Nov 23 09:48:55 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Wed Nov 30 15:31:58 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 9 +-
.../java/org/apache/kylin/cube/CubeManager.java | 3 +-
.../kylin/cube/cli/DictionaryGeneratorCLI.java | 25 ++-
.../apache/kylin/dict/DictionaryManager.java | 17 +-
.../apache/kylin/dict/DictionaryProvider.java | 28 ++++
.../dict/DictionaryReducerLocalGenerator.java | 156 +++++++++++++++++++
.../dict/IDictionaryReducerLocalBuilder.java | 31 ++++
.../kylin/dict/DictionaryProviderTest.java | 109 +++++++++++++
.../storage/translate/ColumnValueRange.java | 2 +-
.../engine/mr/steps/CreateDictionaryJob.java | 44 +++++-
.../mr/steps/FactDistinctColumnsReducer.java | 123 +++++++++++++--
.../mr/steps/UpdateCubeInfoAfterBuildStep.java | 54 +++----
.../storage/hbase/cube/v1/CubeStorageQuery.java | 6 +-
13 files changed, 547 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 7dcc771..766c04d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -719,7 +719,14 @@ abstract public class KylinConfigBase implements Serializable {
//UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns
public int getUHCReducerCount() {
- return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "3"));
+ return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "1"));
+ }
+
+ public boolean isReducerLocalBuildDict() {
+ if (getUHCReducerCount() != 1) {
+ return false;
+ }
+ return Boolean.parseBoolean(getOptional("kylin.engine.mr.reducer-local-build-dict", "true"));
}
public String getYarnStatusCheckUrl() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index b4422d2..119a21a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -213,6 +213,7 @@ public class CubeManager implements IRealizationProvider {
return result;
}
+
public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, ReadableTable inpTable) throws IOException {
CubeDesc cubeDesc = cubeSeg.getCubeDesc();
if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
@@ -221,6 +222,7 @@ public class CubeManager implements IRealizationProvider {
String builderClass = cubeDesc.getDictionaryBuilderClass(col);
DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(cubeDesc.getModel(), col, inpTable, builderClass);
+
saveDictionaryInfo(cubeSeg, col, dictInfo);
return dictInfo;
}
@@ -266,7 +268,6 @@ public class CubeManager implements IRealizationProvider {
} catch (IOException e) {
throw new IllegalStateException("Failed to get dictionary for cube segment" + cubeSeg + ", col" + col, e);
}
-
return (Dictionary<String>) info.getDictionaryObject();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
index a6aeb96..a4e1df0 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -22,11 +22,13 @@ import java.io.IOException;
import java.util.Set;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.DimensionDesc;
import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.DictionaryProvider;
import org.apache.kylin.dict.DistinctColumnValuesProvider;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.DataModelDesc;
@@ -44,21 +46,30 @@ public class DictionaryGeneratorCLI {
private static final Logger logger = LoggerFactory.getLogger(DictionaryGeneratorCLI.class);
- public static void processSegment(KylinConfig config, String cubeName, String segmentID, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
+ public static void processSegment(KylinConfig config, String cubeName, String segmentID, DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException {
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
CubeSegment segment = cube.getSegmentById(segmentID);
- processSegment(config, segment, factTableValueProvider);
+ processSegment(config, segment, factTableValueProvider, dictProvider);
}
- private static void processSegment(KylinConfig config, CubeSegment cubeSeg, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
+ private static void processSegment(KylinConfig config, CubeSegment cubeSeg, DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException {
CubeManager cubeMgr = CubeManager.getInstance(config);
// dictionary
for (TblColRef col : cubeSeg.getCubeDesc().getAllColumnsNeedDictionaryBuilt()) {
logger.info("Building dictionary for " + col);
ReadableTable inpTable = decideInputTable(cubeSeg.getModel(), col, factTableValueProvider);
- cubeMgr.buildDictionary(cubeSeg, col, inpTable);
+ if (config.isReducerLocalBuildDict() && dictProvider != null) {
+ Dictionary<String> dict = dictProvider.getDictionary(col);
+ if (dict != null) {
+ cubeMgr.saveDictionary(cubeSeg, col, inpTable, dict);
+ } else {
+ cubeMgr.buildDictionary(cubeSeg, col, inpTable);
+ }
+ } else {
+ cubeMgr.buildDictionary(cubeSeg, col, inpTable);
+ }
}
// snapshot
@@ -68,19 +79,19 @@ public class DictionaryGeneratorCLI {
if (cubeSeg.getModel().isLookupTable(table))
toSnapshot.add(table.getTableIdentity());
}
-
+
for (String tableIdentity : toSnapshot) {
logger.info("Building snapshot of " + tableIdentity);
cubeMgr.buildSnapshotTable(cubeSeg, tableIdentity);
}
}
-
+
private static ReadableTable decideInputTable(DataModelDesc model, TblColRef col, DistinctColumnValuesProvider factTableValueProvider) {
KylinConfig config = model.getConfig();
DictionaryManager dictMgr = DictionaryManager.getInstance(config);
TblColRef srcCol = dictMgr.decideSourceData(model, col);
String srcTable = srcCol.getTable();
-
+
ReadableTable inpTable;
if (model.isFactTable(srcTable)) {
inpTable = factTableValueProvider.getDistinctValuesFor(srcCol);
http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 6178234..0caef14 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -87,6 +87,7 @@ public class DictionaryManager {
private KylinConfig config;
private LoadingCache<String, DictionaryInfo> dictCache; // resource
+
// path ==>
// DictionaryInfo
@@ -275,10 +276,12 @@ public class DictionaryManager {
return buildDictionary(model, col, inpTable, null);
}
+
public DictionaryInfo buildDictionary(DataModelDesc model, TblColRef col, ReadableTable inpTable, String builderClass) throws IOException {
if (inpTable.exists() == false)
return null;
+
logger.info("building dictionary for " + col);
DictionaryInfo dictInfo = createDictionaryInfo(model, col, inpTable);
@@ -291,6 +294,12 @@ public class DictionaryManager {
logger.info("Building dictionary object " + JsonUtil.writeValueAsString(dictInfo));
Dictionary<String> dictionary;
+ dictionary = buildDictFromReadableTable(inpTable, dictInfo, builderClass, col);
+ return trySaveNewDict(dictionary, dictInfo);
+ }
+
+ private Dictionary<String> buildDictFromReadableTable(ReadableTable inpTable, DictionaryInfo dictInfo, String builderClass, TblColRef col) throws IOException {
+ Dictionary<String> dictionary;
IDictionaryValueEnumerator columnValueEnumerator = null;
try {
columnValueEnumerator = new TableColumnValueEnumerator(inpTable.getReader(), dictInfo.getSourceColumnIndex());
@@ -304,7 +313,7 @@ public class DictionaryManager {
if (columnValueEnumerator != null)
columnValueEnumerator.close();
}
- return trySaveNewDict(dictionary, dictInfo);
+ return dictionary;
}
public DictionaryInfo saveDictionary(DataModelDesc model, TblColRef col, ReadableTable inpTable, Dictionary<String> dictionary) throws IOException {
@@ -336,19 +345,19 @@ public class DictionaryManager {
// FK on fact table and join type is inner, use PK from lookup instead
if (model.isFactTable(col.getTable()) == false)
return col;
-
+
// find a lookup table that the col joins as FK
for (TableRef lookup : model.getLookupTables()) {
JoinDesc lookupJoin = model.getJoinByPKSide(lookup);
int find = ArrayUtils.indexOf(lookupJoin.getForeignKeyColumns(), col);
if (find < 0)
continue;
-
+
// make sure the joins are all inner up to the root
if (isAllInnerJoinsToRoot(model, lookupJoin))
return lookupJoin.getPrimaryKeyColumns()[find];
}
-
+
return col;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryProvider.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryProvider.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryProvider.java
new file mode 100644
index 0000000..6387535
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryProvider.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ * Created by xiefan on 16-11-23.
+ */
+public interface DictionaryProvider {
+ public Dictionary<String> getDictionary(TblColRef col);
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryReducerLocalGenerator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryReducerLocalGenerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryReducerLocalGenerator.java
new file mode 100644
index 0000000..35d379a
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryReducerLocalGenerator.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.dict;
+
+import com.google.common.base.Preconditions;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.metadata.datatype.DataType;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+
+/**
+ * Created by xiefan on 16-11-16.
+ *
+ * TODO:sample,mergeDict
+ */
+public class DictionaryReducerLocalGenerator {
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DictionaryReducerLocalGenerator.class);
+
+ private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", "yyyyMMdd" };
+
+ public static IDictionaryReducerLocalBuilder getBuilder(DataType dataType) {
+ Preconditions.checkNotNull(dataType, "dataType cannot be null");
+
+ IDictionaryReducerLocalBuilder builder;
+ if (dataType.isDateTimeFamily()) {
+ if (dataType.isDate())
+ builder = new DateDictBuilder();
+ else
+ builder = new TimeDictBuilder();
+ } else if (dataType.isNumberFamily()) {
+ builder = new NumberDictBuilder(0);
+ } else {
+ builder = new StringDictBuilder(0);
+ }
+ return builder;
+ }
+
+ private static class DateDictBuilder implements IDictionaryReducerLocalBuilder {
+
+ private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", "yyyyMMdd" };
+
+ private String matchPattern = null;
+
+ private boolean isRecognizeFormat = false;
+
+ private SimpleDateFormat sdf;
+
+ @Override
+ public Dictionary<String> build(int baseId) throws Exception {
+ if (isRecognizeFormat) {
+ return new DateStrDictionary(matchPattern, baseId);
+ } else {
+ throw new IllegalStateException("Date format not match");
+ }
+ }
+
+ @Override
+ public void addValue(String value) throws Exception {
+ if (matchPattern == null) { //init match pattern
+ for (String ptn : DATE_PATTERNS) {
+ matchPattern = ptn;
+ SimpleDateFormat sdf = new SimpleDateFormat(ptn);
+ try {
+ sdf.parse(value);
+ isRecognizeFormat = true;
+ break;
+ } catch (ParseException e) {
+
+ }
+ }
+ sdf = new SimpleDateFormat(matchPattern);
+ }
+ if (!isRecognizeFormat) {
+ throw new IllegalStateException("Date format not match");
+ }
+ try {
+ sdf.parse(value);
+ } catch (ParseException e) {
+ isRecognizeFormat = false;
+ logger.info("Unrecognized date value: " + value);
+ }
+ }
+
+ }
+
+ private static class TimeDictBuilder implements IDictionaryReducerLocalBuilder {
+
+ @Override
+ public Dictionary<String> build(int baseId) {
+ return new TimeStrDictionary();
+ }
+
+ @Override
+ public void addValue(String value) {
+
+ }
+
+ }
+
+ private static class StringDictBuilder implements IDictionaryReducerLocalBuilder {
+
+ private TrieDictionaryForestBuilder<String> builder;
+
+ public StringDictBuilder(int baseId) {
+ builder = new TrieDictionaryForestBuilder<String>(new StringBytesConverter(), 0);
+ }
+
+ @Override
+ public Dictionary<String> build(int baseId) {
+ return builder.build();
+ }
+
+ @Override
+ public void addValue(String value) {
+ builder.addValue(value);
+ }
+
+ }
+
+ public static class NumberDictBuilder implements IDictionaryReducerLocalBuilder {
+
+ private NumberDictionaryForestBuilder builder;
+
+ public NumberDictBuilder(int baseId) {
+ builder = new NumberDictionaryForestBuilder(baseId);
+ }
+
+ @Override
+ public Dictionary<String> build(int baseId) {
+ return builder.build();
+ }
+
+ @Override
+ public void addValue(String value) {
+ builder.addValue(value);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryReducerLocalBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryReducerLocalBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryReducerLocalBuilder.java
new file mode 100644
index 0000000..19b1d28
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryReducerLocalBuilder.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.Dictionary;
+
+/**
+ * Created by xiefan on 16-11-16.
+ */
+public interface IDictionaryReducerLocalBuilder {
+ Dictionary<String> build(int baseId) throws Exception;
+
+ void addValue(String value) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java
new file mode 100644
index 0000000..0225737
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java
@@ -0,0 +1,109 @@
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.Test;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.lang.reflect.ParameterizedType;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Created by xiefan on 16-11-23.
+ */
+public class DictionaryProviderTest {
+
+ @Test
+ public void testReadWrite() throws Exception{
+ //string dict
+ Dictionary<String> dict = getDict(DataType.getType("string"),
+ Arrays.asList(new String[]{"a","b"}).iterator());
+ readWriteTest(dict);
+ //number dict
+ Dictionary<String> dict2 = getDict(DataType.getType("long"),
+ Arrays.asList(new String[]{"1","2"}).iterator());
+ readWriteTest(dict2);
+
+ //date dict
+ Dictionary<String> dict3 = getDict(DataType.getType("datetime"),
+ Arrays.asList(new String[]{"20161122","20161123"}).iterator());
+ readWriteTest(dict3);
+
+ //date dict
+ Dictionary<String> dict4 = getDict(DataType.getType("datetime"),
+ Arrays.asList(new String[]{"2016-11-22","2016-11-23"}).iterator());
+ readWriteTest(dict4);
+
+ //date dict
+ try {
+ Dictionary<String> dict5 = getDict(DataType.getType("date"),
+ Arrays.asList(new String[]{"2016-11-22", "20161122"}).iterator());
+ readWriteTest(dict5);
+ fail("Date format not correct.Should throw exception");
+ }catch (IllegalStateException e){
+ //correct
+ }
+ }
+
+ @Test
+ public void testReadWriteTime(){
+ System.out.println(Long.MAX_VALUE);
+ System.out.println(Long.MIN_VALUE);
+ }
+
+
+ private Dictionary<String> getDict(DataType type, Iterator<String> values) throws Exception{
+ IDictionaryReducerLocalBuilder builder = DictionaryReducerLocalGenerator.getBuilder(type);
+ while(values.hasNext()){
+ builder.addValue(values.next());
+ }
+ return builder.build(0);
+ }
+
+ private void readWriteTest(Dictionary<String> dict) throws Exception{
+ final String path = "src/test/resources/dict/tmp_dict";
+ File f = new File(path);
+ f.deleteOnExit();
+ f.createNewFile();
+ String dictClassName = dict.getClass().getName();
+ DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
+ out.writeUTF(dictClassName);
+ dict.write(out);
+ out.close();
+ //read dict
+ DataInputStream in = null;
+ Dictionary<String> dict2 = null;
+ try {
+ File f2 = new File(path);
+ in = new DataInputStream(new FileInputStream(f2));
+ String dictClassName2 = in.readUTF();
+ dict2 = (Dictionary<String>) ClassUtil.newInstance(dictClassName2);
+ dict2.readFields(in);
+ }catch(IOException e){
+ e.printStackTrace();
+ }finally {
+ if(in != null){
+ try {
+ in.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ assertTrue(dict.equals(dict2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
index 0dc1afa..56b1106 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
@@ -168,7 +168,7 @@ public class ColumnValueRange {
// remove invalid EQ/IN values and round start/end according to dictionary
public void preEvaluateWithDict(Dictionary<String> dict) {
- if (dict == null)
+ if (dict == null || dict.getSize() == 0)
return;
if (equalValues != null) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index 5d7cb21..63005f9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -19,15 +19,25 @@
package org.apache.kylin.engine.mr.steps;
import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.cli.DictionaryGeneratorCLI;
+import org.apache.kylin.dict.DictionaryProvider;
import org.apache.kylin.dict.DistinctColumnValuesProvider;
+import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.SortedColumnDFSFile;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.ReadableTable;
+import java.io.IOException;
+
/**
* @author ysong1
*/
@@ -48,13 +58,45 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
final String segmentID = getOptionValue(OPTION_SEGMENT_ID);
final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH);
- KylinConfig config = KylinConfig.getInstanceFromEnv();
+ final KylinConfig config = KylinConfig.getInstanceFromEnv();
DictionaryGeneratorCLI.processSegment(config, cubeName, segmentID, new DistinctColumnValuesProvider() {
@Override
public ReadableTable getDistinctValuesFor(TblColRef col) {
return new SortedColumnDFSFile(factColumnsInputPath + "/" + col.getName(), col.getType());
}
+ }, new DictionaryProvider() {
+
+ @Override
+ public Dictionary<String> getDictionary(TblColRef col) {
+ if (!config.isReducerLocalBuildDict()) {
+ return null;
+ }
+ FSDataInputStream is = null;
+ try {
+ Path colDir = new Path(factColumnsInputPath, col.getName());
+ Path outputFile = new Path(colDir, col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
+ Configuration conf = HadoopUtil.getCurrentConfiguration();
+ FileSystem fs = HadoopUtil.getFileSystem(outputFile.getName());
+ is = fs.open(outputFile);
+ String dictClassName = is.readUTF();
+ Dictionary<String> dict = (Dictionary<String>) ClassUtil.newInstance(dictClassName);
+ dict.readFields(is);
+ logger.info("DictionaryProvider read dict form file : " + outputFile.getName());
+ return dict;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return null;
+ } finally {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
});
return returnCode;
http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 6e24d61..5511626 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -27,6 +27,8 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -36,14 +38,19 @@ import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.DictionaryReducerLocalGenerator;
+import org.apache.kylin.dict.IDictionaryReducerLocalBuilder;
import org.apache.kylin.engine.mr.KylinReducer;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.CubeStatsWriter;
import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
+import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,10 +80,19 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class);
+ //local build dict
+ private boolean isReducerLocalBuildDict;
+ private IDictionaryReducerLocalBuilder builder;
+ private FastDateFormat dateFormat;
+ private long timeMaxValue = Long.MIN_VALUE;
+ private long timeMinValue = Long.MAX_VALUE;
+ public static final String DICT_FILE_POSTFIX = ".RLD";
+ public static final String PARTITION_COL_INFO_FILE_POSTFIX = ".PCI";
+ private boolean isPartitionCol = false;
+
@Override
protected void setup(Context context) throws IOException {
super.bindCurrentConfiguration(context.getConfiguration());
-
Configuration conf = context.getConfiguration();
KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
@@ -102,14 +118,36 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
} else if (collectStatistics && (taskId == numberOfTasks - 2)) {
// partition col
isStatistics = false;
+ isPartitionCol = true;
col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
colValues = Lists.newLinkedList();
+ DataType partitionColType = col.getType();
+ if (partitionColType.isDate()) {
+ dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN);
+ } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
+ dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
+ } else if (partitionColType.isStringFamily()) {
+ String partitionDateFormat = cubeDesc.getModel().getPartitionDesc().getPartitionDateFormat();
+ if (StringUtils.isEmpty(partitionDateFormat)) {
+ partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
+ }
+ dateFormat = DateFormat.getDateFormat(partitionDateFormat);
+ } else {
+ throw new IllegalStateException("Type " + partitionColType + " is not valid partition column type");
+ }
} else {
// col
isStatistics = false;
col = columnList.get(ReducerIdToColumnIndex.get(taskId));
colValues = Lists.newLinkedList();
}
+
+ //local build dict
+ isReducerLocalBuildDict = config.isReducerLocalBuildDict();
+ if (col != null && isReducerLocalBuildDict) {
+ builder = DictionaryReducerLocalGenerator.getBuilder(col.getType());
+ }
+
}
private void initReducerIdToColumnIndex(KylinConfig config) throws IOException {
@@ -150,11 +188,26 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
}
}
} else {
- colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1)));
- if (colValues.size() == 1000000) { //spill every 1 million
- logger.info("spill values to disk...");
- outputDistinctValues(col, colValues, context);
- colValues.clear();
+ if (isReducerLocalBuildDict) {
+ String value = new String(key.getBytes(), 1, key.getLength() - 1);
+ //partition col
+ try {
+ if (isPartitionCol) {
+ long time = dateFormat.parse(value).getTime();
+ timeMinValue = Math.min(timeMinValue, time);
+ timeMaxValue = Math.max(timeMaxValue, time);
+ }
+ builder.addValue(value);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ } else {
+ colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1)));
+ if (colValues.size() == 1000000) { //spill every 1 million
+ logger.info("spill values to disk...");
+ outputDistinctValues(col, colValues, context);
+ colValues.clear();
+ }
}
}
@@ -191,12 +244,64 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
}
}
+ private void outputDict(TblColRef col, Dictionary<String> dict, Context context) throws IOException {
+ final String fileName = col.getName() + DICT_FILE_POSTFIX;
+ FSDataOutputStream out = getOutputStream(context, fileName);
+ try {
+ String dictClassName = dict.getClass().getName();
+ out.writeUTF(dictClassName);
+ dict.write(out);
+ logger.info("reducer id is:+" + taskId + " colName:" + col.getName() + " writing dict at file : " + fileName + " dict class:" + dictClassName);
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+ }
+
+ private void outputPartitionInfo(Context context) throws IOException {
+ final String fileName = col.getName() + PARTITION_COL_INFO_FILE_POSTFIX;
+ FSDataOutputStream out = getOutputStream(context, fileName);
+ try {
+ out.writeLong(timeMinValue);
+ out.writeLong(timeMaxValue);
+ logger.info("write partition info for col : " + col.getName() + " minValue:" + timeMinValue + " maxValue:" + timeMaxValue);
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+ }
+
+ private FSDataOutputStream getOutputStream(Context context, String outputFileName) throws IOException {
+ final Configuration conf = context.getConfiguration();
+ final FileSystem fs = FileSystem.get(conf);
+ final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH);
+ final Path colDir = new Path(outputPath, col.getName());
+ final Path outputFile = new Path(colDir, outputFileName);
+ FSDataOutputStream out = null;
+ if (!fs.exists(colDir)) {
+ fs.mkdirs(colDir);
+ }
+ fs.deleteOnExit(outputFile);
+ out = fs.create(outputFile);
+ return out;
+ }
+
@Override
protected void doCleanup(Context context) throws IOException, InterruptedException {
if (isStatistics == false) {
- if (colValues.size() > 0) {
- outputDistinctValues(col, colValues, context);
- colValues.clear();
+ if (isReducerLocalBuildDict) {
+ try {
+ if (isPartitionCol) {
+ outputPartitionInfo(context);
+ }
+ Dictionary<String> dict = builder.build(0);
+ outputDict(col, dict, context);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ } else {
+ if (colValues.size() > 0) {
+ outputDistinctValues(col, colValues, context);
+ colValues.clear();
+ }
}
} else {
//output the hll info;
http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index eb06f07..977196c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -19,25 +19,23 @@
package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
-import java.text.ParseException;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.time.FastDateFormat;
-import org.apache.kylin.common.util.DateFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.SortedColumnDFSFile;
+import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.ReadableTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,39 +81,25 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
private void updateTimeRange(CubeSegment segment) throws IOException {
final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
- final DataType partitionColType = partitionCol.getType();
- final FastDateFormat dateFormat;
- if (partitionColType.isDate()) {
- dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN);
- } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
- dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
- } else if (partitionColType.isStringFamily()) {
- String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat();
- if (StringUtils.isEmpty(partitionDateFormat)) {
- partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
- }
- dateFormat = DateFormat.getDateFormat(partitionDateFormat);
- } else {
- throw new IllegalStateException("Type " + partitionColType + " is not valid partition column type");
- }
-
- final String factDistinctPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
- //final ReadableTable readableTable = new DFSFileTable(factDistinctPath + "/" + partitionCol.getName(), -1);
- final ReadableTable readableTable = new SortedColumnDFSFile(factDistinctPath + "/" + partitionCol.getName(), partitionCol.getType());
- final ReadableTable.TableReader tableReader = readableTable.getReader();
+ final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
+ Path colDir = new Path(factColumnsInputPath, partitionCol.getName());
+ Path outputFile = new Path(colDir, partitionCol.getName() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX);
+ Configuration conf = HadoopUtil.getCurrentConfiguration();
+ FileSystem fs = HadoopUtil.getFileSystem(outputFile.getName());
+ FSDataInputStream is = null;
long minValue = Long.MAX_VALUE, maxValue = Long.MIN_VALUE;
try {
- while (tableReader.next()) {
- long time = dateFormat.parse(tableReader.getRow()[0]).getTime();
- minValue = Math.min(minValue, time);
- maxValue = Math.max(maxValue, time);
- }
- } catch (ParseException e) {
+ is = fs.open(outputFile);
+ long min = is.readLong();
+ long max = is.readLong();
+ minValue = Math.min(min, minValue);
+ maxValue = Math.max(max, maxValue);
+ } catch (IOException e) {
throw new IOException(e);
} finally {
- IOUtils.closeQuietly(tableReader);
+ IOUtils.closeQuietly(is);
}
-
+ logger.info("updateTimeRange step. minValue:" + minValue + " maxValue:" + maxValue);
segment.setDateRangeStart(minValue);
segment.setDateRangeEnd(maxValue);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index 9af0faf..02aa64a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -437,7 +437,11 @@ public class CubeStorageQuery implements IStorageQuery {
// build row key range for each cube segment
StringBuilder sb = new StringBuilder("hbasekeyrange trace: ");
for (CubeSegment cubeSeg : segs) {
-
+ CubeDesc cubeDesc = cubeSeg.getCubeDesc();
+ if (cubeDesc.getConfig().isSkippingEmptySegments() && cubeSeg.getInputRecords() == 0) {
+ logger.info("Skip cube segment {} because its input record is 0", cubeSeg);
+ continue;
+ }
// consider derived (lookup snapshot), filter on dimension may
// differ per segment
List<Collection<ColumnValueRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter, cubeSeg);