You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/12/07 04:29:32 UTC
[15/50] [abbrv] kylin git commit: KYLIN-2217 Code review,
refactor IDictionaryBuilder
KYLIN-2217 Code review, refactor IDictionaryBuilder
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/30cb1ac6
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/30cb1ac6
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/30cb1ac6
Branch: refs/heads/master-hbase1.x
Commit: 30cb1ac678b063ac164acd27591e8a1d0becafa0
Parents: 1af08e4
Author: Li Yang <li...@apache.org>
Authored: Wed Nov 30 15:30:15 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Wed Nov 30 21:00:17 2016 +0800
----------------------------------------------------------------------
.../kylin/cube/cli/DictionaryGeneratorCLI.java | 2 +-
.../apache/kylin/dict/DictionaryGenerator.java | 165 ++++++++++++-------
.../apache/kylin/dict/DictionaryManager.java | 16 +-
.../apache/kylin/dict/DictionaryProvider.java | 4 +-
.../dict/DictionaryReducerLocalGenerator.java | 156 ------------------
.../kylin/dict/GlobalDictionaryBuilder.java | 36 ++--
.../apache/kylin/dict/IDictionaryBuilder.java | 13 +-
.../dict/IDictionaryReducerLocalBuilder.java | 31 ----
.../kylin/dict/DictionaryProviderTest.java | 63 +++----
.../engine/mr/steps/CreateDictionaryJob.java | 42 ++---
.../mr/steps/FactDistinctColumnsReducer.java | 115 +++++--------
.../mr/steps/FactDistinctHiveColumnsMapper.java | 9 +-
.../mr/steps/UpdateCubeInfoAfterBuildStep.java | 6 +-
13 files changed, 231 insertions(+), 427 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
index a4e1df0..163c6ca 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -60,7 +60,7 @@ public class DictionaryGeneratorCLI {
for (TblColRef col : cubeSeg.getCubeDesc().getAllColumnsNeedDictionaryBuilt()) {
logger.info("Building dictionary for " + col);
ReadableTable inpTable = decideInputTable(cubeSeg.getModel(), col, factTableValueProvider);
- if (config.isReducerLocalBuildDict() && dictProvider != null) {
+ if (dictProvider != null) {
Dictionary<String> dict = dictProvider.getDictionary(col);
if (dict != null) {
cubeMgr.saveDictionary(cubeSeg, col, inpTable, dict);
http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index 810a392..cd13d59 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -19,12 +19,11 @@
package org.apache.kylin.dict;
import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.metadata.datatype.DataType;
import org.slf4j.Logger;
@@ -40,9 +39,7 @@ public class DictionaryGenerator {
private static final Logger logger = LoggerFactory.getLogger(DictionaryGenerator.class);
- private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", "yyyyMMdd" };
-
- public static Dictionary<String> buildDictionary(DataType dataType, IDictionaryValueEnumerator valueEnumerator) throws IOException {
+ public static IDictionaryBuilder newDictionaryBuilder(DataType dataType) {
Preconditions.checkNotNull(dataType, "dataType cannot be null");
// build dict, case by data type
@@ -57,16 +54,33 @@ public class DictionaryGenerator {
} else {
builder = new StringDictBuilder();
}
+ return builder;
+ }
- return buildDictionary(builder, null, valueEnumerator);
+ public static Dictionary<String> buildDictionary(DataType dataType, IDictionaryValueEnumerator valueEnumerator) throws IOException {
+ return buildDictionary(newDictionaryBuilder(dataType), null, valueEnumerator);
}
- public static Dictionary<String> buildDictionary(IDictionaryBuilder builder, DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator) throws IOException {
+ static Dictionary<String> buildDictionary(IDictionaryBuilder builder, DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator) throws IOException {
int baseId = 0; // always 0 for now
int nSamples = 5;
ArrayList<String> samples = new ArrayList<String>(nSamples);
- Dictionary<String> dict = builder.build(dictInfo, valueEnumerator, baseId, nSamples, samples);
+ // init the builder
+ builder.init(dictInfo, baseId);
+
+ // add values
+ while (valueEnumerator.moveNext()) {
+ String value = valueEnumerator.current();
+
+ boolean accept = builder.addValue(value);
+
+ if (accept && samples.size() < nSamples && samples.contains(value) == false)
+ samples.add(value);
+ }
+
+ // build
+ Dictionary<String> dict = builder.build();
// log a few samples
StringBuilder buf = new StringBuilder();
@@ -88,81 +102,114 @@ public class DictionaryGenerator {
}
private static class DateDictBuilder implements IDictionaryBuilder {
+ private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", "yyyyMMdd" };
+
+ private int baseId;
+ private String datePattern;
+
@Override
- public Dictionary<String> build(DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException {
- final int BAD_THRESHOLD = 0;
- String matchPattern = null;
- String value;
-
- for (String ptn : DATE_PATTERNS) {
- matchPattern = ptn; // be optimistic
- int badCount = 0;
- SimpleDateFormat sdf = new SimpleDateFormat(ptn);
- while (valueEnumerator.moveNext()) {
- value = valueEnumerator.current();
- if (value == null || value.length() == 0)
- continue;
+ public void init(DictionaryInfo info, int baseId) throws IOException {
+ this.baseId = baseId;
+ }
+ @Override
+ public boolean addValue(String value) {
+ if (StringUtils.isBlank(value)) // empty string is treated as null
+ return false;
+
+ // detect date pattern on the first value
+ if (datePattern == null) {
+ for (String p : DATE_PATTERNS) {
try {
- sdf.parse(value);
- if (returnSamples.size() < nSamples && returnSamples.contains(value) == false)
- returnSamples.add(value);
- } catch (ParseException e) {
- logger.info("Unrecognized date value: " + value);
- badCount++;
- if (badCount > BAD_THRESHOLD) {
- matchPattern = null;
- break;
- }
+ DateFormat.stringToDate(value, p);
+ datePattern = p;
+ break;
+ } catch (Exception e) {
+ // continue;
}
}
- if (matchPattern != null) {
- return new DateStrDictionary(matchPattern, baseId);
- }
+ if (datePattern == null)
+ throw new IllegalArgumentException("Unknown date pattern for input value: " + value);
}
+
+ // check the date format
+ DateFormat.stringToDate(value, datePattern);
+ return true;
+ }
+
+ @Override
+ public Dictionary<String> build() throws IOException {
+ if (datePattern == null)
+ datePattern = DATE_PATTERNS[0];
- throw new IllegalStateException("Unrecognized datetime value");
+ return new DateStrDictionary(datePattern, baseId);
}
}
private static class TimeDictBuilder implements IDictionaryBuilder {
+
@Override
- public Dictionary<String> build(DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException {
+ public void init(DictionaryInfo info, int baseId) throws IOException {
+ }
+
+ @Override
+ public boolean addValue(String value) {
+ if (StringUtils.isBlank(value)) // empty string is treated as null
+ return false;
+
+ // check the time format
+ DateFormat.stringToMillis(value);
+ return true;
+ }
+
+ @Override
+ public Dictionary<String> build() throws IOException {
return new TimeStrDictionary(); // base ID is always 0
}
}
private static class StringDictBuilder implements IDictionaryBuilder {
+ TrieDictionaryForestBuilder builder;
+
@Override
- public Dictionary<String> build(DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException {
- TrieDictionaryForestBuilder builder = new TrieDictionaryForestBuilder(new StringBytesConverter(), baseId);
- String value;
- while (valueEnumerator.moveNext()) {
- value = valueEnumerator.current();
- if (value == null)
- continue;
- builder.addValue(value);
- if (returnSamples.size() < nSamples && returnSamples.contains(value) == false)
- returnSamples.add(value);
- }
+ public void init(DictionaryInfo info, int baseId) throws IOException {
+ builder = new TrieDictionaryForestBuilder(new StringBytesConverter(), baseId);
+ }
+
+ @Override
+ public boolean addValue(String value) {
+ if (value == null)
+ return false;
+
+ builder.addValue(value);
+ return true;
+ }
+
+ @Override
+ public Dictionary<String> build() throws IOException {
return builder.build();
}
}
private static class NumberDictBuilder implements IDictionaryBuilder {
+ NumberDictionaryForestBuilder builder;
+
@Override
- public Dictionary<String> build(DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException {
- NumberDictionaryForestBuilder builder = new NumberDictionaryForestBuilder(baseId);
- String value;
- while (valueEnumerator.moveNext()) {
- value = valueEnumerator.current();
- if (StringUtils.isBlank(value)) // empty string is null for numbers
- continue;
-
- builder.addValue(value);
- if (returnSamples.size() < nSamples && returnSamples.contains(value) == false)
- returnSamples.add(value);
- }
+ public void init(DictionaryInfo info, int baseId) throws IOException {
+ builder = new NumberDictionaryForestBuilder(baseId);
+ }
+
+ @Override
+ public boolean addValue(String value) {
+ if (StringUtils.isBlank(value)) // empty string is treated as null
+ return false;
+
+ builder.addValue(value);
+ return true;
+ }
+
+ @Override
+ public Dictionary<String> build() throws IOException {
return builder.build();
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 0caef14..54bc1c4 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -87,10 +87,6 @@ public class DictionaryManager {
private KylinConfig config;
private LoadingCache<String, DictionaryInfo> dictCache; // resource
-
- // path ==>
- // DictionaryInfo
-
private DictionaryManager(KylinConfig config) {
this.config = config;
this.dictCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<String, DictionaryInfo>() {
@@ -276,12 +272,10 @@ public class DictionaryManager {
return buildDictionary(model, col, inpTable, null);
}
-
public DictionaryInfo buildDictionary(DataModelDesc model, TblColRef col, ReadableTable inpTable, String builderClass) throws IOException {
if (inpTable.exists() == false)
return null;
-
logger.info("building dictionary for " + col);
DictionaryInfo dictInfo = createDictionaryInfo(model, col, inpTable);
@@ -303,10 +297,12 @@ public class DictionaryManager {
IDictionaryValueEnumerator columnValueEnumerator = null;
try {
columnValueEnumerator = new TableColumnValueEnumerator(inpTable.getReader(), dictInfo.getSourceColumnIndex());
- if (builderClass == null)
+ if (builderClass == null) {
dictionary = DictionaryGenerator.buildDictionary(DataType.getType(dictInfo.getDataType()), columnValueEnumerator);
- else
- dictionary = DictionaryGenerator.buildDictionary((IDictionaryBuilder) ClassUtil.newInstance(builderClass), dictInfo, columnValueEnumerator);
+ } else {
+ IDictionaryBuilder builder = (IDictionaryBuilder) ClassUtil.newInstance(builderClass);
+ dictionary = DictionaryGenerator.buildDictionary(builder, dictInfo, columnValueEnumerator);
+ }
} catch (Exception ex) {
throw new RuntimeException("Failed to create dictionary on " + col, ex);
} finally {
@@ -365,7 +361,7 @@ public class DictionaryManager {
while (join != null) {
if (join.isInnerJoin() == false)
return false;
-
+
TableRef table = join.getFKSide();
join = model.getJoinByPKSide(table);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryProvider.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryProvider.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryProvider.java
index 6387535..8476f5c 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryProvider.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryProvider.java
@@ -17,6 +17,8 @@
*/
package org.apache.kylin.dict;
+import java.io.IOException;
+
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.metadata.model.TblColRef;
@@ -24,5 +26,5 @@ import org.apache.kylin.metadata.model.TblColRef;
* Created by xiefan on 16-11-23.
*/
public interface DictionaryProvider {
- public Dictionary<String> getDictionary(TblColRef col);
+ public Dictionary<String> getDictionary(TblColRef col) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryReducerLocalGenerator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryReducerLocalGenerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryReducerLocalGenerator.java
deleted file mode 100644
index 35d379a..0000000
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryReducerLocalGenerator.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.dict;
-
-import com.google.common.base.Preconditions;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.metadata.datatype.DataType;
-
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-
-/**
- * Created by xiefan on 16-11-16.
- *
- * TODO:sample,mergeDict
- */
-public class DictionaryReducerLocalGenerator {
-
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DictionaryReducerLocalGenerator.class);
-
- private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", "yyyyMMdd" };
-
- public static IDictionaryReducerLocalBuilder getBuilder(DataType dataType) {
- Preconditions.checkNotNull(dataType, "dataType cannot be null");
-
- IDictionaryReducerLocalBuilder builder;
- if (dataType.isDateTimeFamily()) {
- if (dataType.isDate())
- builder = new DateDictBuilder();
- else
- builder = new TimeDictBuilder();
- } else if (dataType.isNumberFamily()) {
- builder = new NumberDictBuilder(0);
- } else {
- builder = new StringDictBuilder(0);
- }
- return builder;
- }
-
- private static class DateDictBuilder implements IDictionaryReducerLocalBuilder {
-
- private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", "yyyyMMdd" };
-
- private String matchPattern = null;
-
- private boolean isRecognizeFormat = false;
-
- private SimpleDateFormat sdf;
-
- @Override
- public Dictionary<String> build(int baseId) throws Exception {
- if (isRecognizeFormat) {
- return new DateStrDictionary(matchPattern, baseId);
- } else {
- throw new IllegalStateException("Date format not match");
- }
- }
-
- @Override
- public void addValue(String value) throws Exception {
- if (matchPattern == null) { //init match pattern
- for (String ptn : DATE_PATTERNS) {
- matchPattern = ptn;
- SimpleDateFormat sdf = new SimpleDateFormat(ptn);
- try {
- sdf.parse(value);
- isRecognizeFormat = true;
- break;
- } catch (ParseException e) {
-
- }
- }
- sdf = new SimpleDateFormat(matchPattern);
- }
- if (!isRecognizeFormat) {
- throw new IllegalStateException("Date format not match");
- }
- try {
- sdf.parse(value);
- } catch (ParseException e) {
- isRecognizeFormat = false;
- logger.info("Unrecognized date value: " + value);
- }
- }
-
- }
-
- private static class TimeDictBuilder implements IDictionaryReducerLocalBuilder {
-
- @Override
- public Dictionary<String> build(int baseId) {
- return new TimeStrDictionary();
- }
-
- @Override
- public void addValue(String value) {
-
- }
-
- }
-
- private static class StringDictBuilder implements IDictionaryReducerLocalBuilder {
-
- private TrieDictionaryForestBuilder<String> builder;
-
- public StringDictBuilder(int baseId) {
- builder = new TrieDictionaryForestBuilder<String>(new StringBytesConverter(), 0);
- }
-
- @Override
- public Dictionary<String> build(int baseId) {
- return builder.build();
- }
-
- @Override
- public void addValue(String value) {
- builder.addValue(value);
- }
-
- }
-
- public static class NumberDictBuilder implements IDictionaryReducerLocalBuilder {
-
- private NumberDictionaryForestBuilder builder;
-
- public NumberDictBuilder(int baseId) {
- builder = new NumberDictionaryForestBuilder(baseId);
- }
-
- @Override
- public Dictionary<String> build(int baseId) {
- return builder.build();
- }
-
- @Override
- public void addValue(String value) {
- builder.addValue(value);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
index 7adc262..b2a3664 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
@@ -36,8 +36,11 @@ import org.slf4j.LoggerFactory;
public class GlobalDictionaryBuilder implements IDictionaryBuilder {
private static final Logger logger = LoggerFactory.getLogger(GlobalDictionaryBuilder.class);
+ AppendTrieDictionary.Builder<String> builder;
+ int baseId;
+
@Override
- public Dictionary<String> build(DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException {
+ public void init(DictionaryInfo dictInfo, int baseId) throws IOException {
if (dictInfo == null) {
throw new IllegalArgumentException("GlobalDictinaryBuilder must used with an existing DictionaryInfo");
}
@@ -55,28 +58,31 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
}
}
- AppendTrieDictionary.Builder<String> builder;
if (appendDicts.isEmpty()) {
logger.info("GlobalDict {} is empty, create new one", dictInfo.getResourceDir());
- builder = AppendTrieDictionary.Builder.create(dictDir);
+ this.builder = AppendTrieDictionary.Builder.create(dictDir);
} else if (appendDicts.size() == 1) {
logger.info("GlobalDict {} exist, append value", appendDicts.get(0));
AppendTrieDictionary dict = (AppendTrieDictionary) DictionaryManager.getInstance(KylinConfig.getInstanceFromEnv()).getDictionary(appendDicts.get(0));
- builder = AppendTrieDictionary.Builder.create(dict);
+ this.builder = AppendTrieDictionary.Builder.create(dict);
} else {
throw new IllegalStateException(String.format("GlobalDict %s should have 0 or 1 append dict but %d", dictInfo.getResourceDir(), appendDicts.size()));
}
-
- String value;
- while (valueEnumerator.moveNext()) {
- value = valueEnumerator.current();
- if (value == null) {
- continue;
- }
- builder.addValue(value);
- if (returnSamples.size() < nSamples && returnSamples.contains(value) == false)
- returnSamples.add(value);
- }
+
+ this.baseId = baseId;
+ }
+
+ @Override
+ public boolean addValue(String value) {
+ if (value == null)
+ return false;
+
+ builder.addValue(value);
+ return true;
+ }
+
+ @Override
+ public Dictionary<String> build() throws IOException {
return builder.build(baseId);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java
index 8f95a2a..0934a7d 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java
@@ -19,11 +19,20 @@
package org.apache.kylin.dict;
import java.io.IOException;
-import java.util.ArrayList;
import org.apache.kylin.common.util.Dictionary;
+/**
+ * An once-only builder for dictionary.
+ */
public interface IDictionaryBuilder {
- Dictionary<String> build(DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException;
+ /** Sets the dictionary info for the dictionary being built. Mainly for GlobalDictionaryBuilder. */
+ void init(DictionaryInfo info, int baseId) throws IOException;
+
+ /** Add a new value into dictionary, returns it is accepted (not null) or not. */
+ boolean addValue(String value);
+
+ /** Build the dictionary */
+ Dictionary<String> build() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryReducerLocalBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryReducerLocalBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryReducerLocalBuilder.java
deleted file mode 100644
index 19b1d28..0000000
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryReducerLocalBuilder.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict;
-
-import org.apache.kylin.common.util.Dictionary;
-
-/**
- * Created by xiefan on 16-11-16.
- */
-public interface IDictionaryReducerLocalBuilder {
- Dictionary<String> build(int baseId) throws Exception;
-
- void addValue(String value) throws Exception;
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java
index 0225737..a4aee76 100644
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java
@@ -1,26 +1,20 @@
package org.apache.kylin.dict;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
-import java.io.BufferedOutputStream;
-import java.io.BufferedWriter;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.lang.reflect.ParameterizedType;
import java.util.Arrays;
import java.util.Iterator;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.junit.Test;
/**
* Created by xiefan on 16-11-23.
@@ -28,53 +22,48 @@ import static org.junit.Assert.fail;
public class DictionaryProviderTest {
@Test
- public void testReadWrite() throws Exception{
+ public void testReadWrite() throws Exception {
//string dict
- Dictionary<String> dict = getDict(DataType.getType("string"),
- Arrays.asList(new String[]{"a","b"}).iterator());
+ Dictionary<String> dict = getDict(DataType.getType("string"), Arrays.asList(new String[] { "a", "b" }).iterator());
readWriteTest(dict);
//number dict
- Dictionary<String> dict2 = getDict(DataType.getType("long"),
- Arrays.asList(new String[]{"1","2"}).iterator());
+ Dictionary<String> dict2 = getDict(DataType.getType("long"), Arrays.asList(new String[] { "1", "2" }).iterator());
readWriteTest(dict2);
//date dict
- Dictionary<String> dict3 = getDict(DataType.getType("datetime"),
- Arrays.asList(new String[]{"20161122","20161123"}).iterator());
+ Dictionary<String> dict3 = getDict(DataType.getType("datetime"), Arrays.asList(new String[] { "20161122", "20161123" }).iterator());
readWriteTest(dict3);
//date dict
- Dictionary<String> dict4 = getDict(DataType.getType("datetime"),
- Arrays.asList(new String[]{"2016-11-22","2016-11-23"}).iterator());
+ Dictionary<String> dict4 = getDict(DataType.getType("datetime"), Arrays.asList(new String[] { "2016-11-22", "2016-11-23" }).iterator());
readWriteTest(dict4);
//date dict
try {
- Dictionary<String> dict5 = getDict(DataType.getType("date"),
- Arrays.asList(new String[]{"2016-11-22", "20161122"}).iterator());
+ Dictionary<String> dict5 = getDict(DataType.getType("date"), Arrays.asList(new String[] { "2016-11-22", "20161122" }).iterator());
readWriteTest(dict5);
fail("Date format not correct.Should throw exception");
- }catch (IllegalStateException e){
+ } catch (IllegalArgumentException e) {
//correct
}
}
@Test
- public void testReadWriteTime(){
+ public void testReadWriteTime() {
System.out.println(Long.MAX_VALUE);
System.out.println(Long.MIN_VALUE);
}
-
- private Dictionary<String> getDict(DataType type, Iterator<String> values) throws Exception{
- IDictionaryReducerLocalBuilder builder = DictionaryReducerLocalGenerator.getBuilder(type);
- while(values.hasNext()){
+ private Dictionary<String> getDict(DataType type, Iterator<String> values) throws Exception {
+ IDictionaryBuilder builder = DictionaryGenerator.newDictionaryBuilder(type);
+ builder.init(null, 0);
+ while (values.hasNext()) {
builder.addValue(values.next());
}
- return builder.build(0);
+ return builder.build();
}
- private void readWriteTest(Dictionary<String> dict) throws Exception{
+ private void readWriteTest(Dictionary<String> dict) throws Exception {
final String path = "src/test/resources/dict/tmp_dict";
File f = new File(path);
f.deleteOnExit();
@@ -93,15 +82,9 @@ public class DictionaryProviderTest {
String dictClassName2 = in.readUTF();
dict2 = (Dictionary<String>) ClassUtil.newInstance(dictClassName2);
dict2.readFields(in);
- }catch(IOException e){
- e.printStackTrace();
- }finally {
- if(in != null){
- try {
- in.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
+ } finally {
+ if (in != null) {
+ in.close();
}
}
assertTrue(dict.equals(dict2));
http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index 63005f9..4985503 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -18,8 +18,10 @@
package org.apache.kylin.engine.mr.steps;
+import java.io.IOException;
+
import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -36,16 +38,8 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.ReadableTable;
-import java.io.IOException;
-
-/**
- * @author ysong1
- */
-
public class CreateDictionaryJob extends AbstractHadoopJob {
- private int returnCode = 0;
-
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
@@ -68,38 +62,28 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
}, new DictionaryProvider() {
@Override
- public Dictionary<String> getDictionary(TblColRef col) {
- if (!config.isReducerLocalBuildDict()) {
+ public Dictionary<String> getDictionary(TblColRef col) throws IOException {
+ Path colDir = new Path(factColumnsInputPath, col.getName());
+ Path dictFile = new Path(colDir, col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
+ FileSystem fs = HadoopUtil.getFileSystem(dictFile.toString());
+ if (fs.exists(dictFile) == false)
return null;
- }
+
FSDataInputStream is = null;
try {
- Path colDir = new Path(factColumnsInputPath, col.getName());
- Path outputFile = new Path(colDir, col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
- Configuration conf = HadoopUtil.getCurrentConfiguration();
- FileSystem fs = HadoopUtil.getFileSystem(outputFile.getName());
- is = fs.open(outputFile);
+ is = fs.open(dictFile);
String dictClassName = is.readUTF();
Dictionary<String> dict = (Dictionary<String>) ClassUtil.newInstance(dictClassName);
dict.readFields(is);
- logger.info("DictionaryProvider read dict form file : " + outputFile.getName());
+ logger.info("DictionaryProvider read dict from file: " + dictFile);
return dict;
- } catch (Exception e) {
- e.printStackTrace();
- return null;
} finally {
- if (is != null) {
- try {
- is.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
+ IOUtils.closeQuietly(is);
}
}
});
- return returnCode;
+ return 0;
}
public static void main(String[] args) throws Exception {
http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 5511626..8933ee2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -27,8 +27,6 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -43,14 +41,13 @@ import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.DictionaryReducerLocalGenerator;
-import org.apache.kylin.dict.IDictionaryReducerLocalBuilder;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.IDictionaryBuilder;
import org.apache.kylin.engine.mr.KylinReducer;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.CubeStatsWriter;
import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,6 +59,8 @@ import com.google.common.collect.Maps;
*/
public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableKey, Text, NullWritable, Text> {
+ protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class);
+
private List<TblColRef> columnList;
private String statisticsOutput = null;
private List<Long> baseCuboidRowCountInMappers;
@@ -75,19 +74,16 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
private boolean isStatistics = false;
private KylinConfig cubeConfig;
private int uhcReducerCount;
- private Map<Integer, Integer> ReducerIdToColumnIndex = new HashMap<>();
+ private Map<Integer, Integer> reducerIdToColumnIndex = new HashMap<>();
private int taskId;
- protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class);
-
//local build dict
private boolean isReducerLocalBuildDict;
- private IDictionaryReducerLocalBuilder builder;
- private FastDateFormat dateFormat;
+ private IDictionaryBuilder builder;
private long timeMaxValue = Long.MIN_VALUE;
private long timeMinValue = Long.MAX_VALUE;
- public static final String DICT_FILE_POSTFIX = ".RLD";
- public static final String PARTITION_COL_INFO_FILE_POSTFIX = ".PCI";
+ public static final String DICT_FILE_POSTFIX = ".rldict";
+ public static final String PARTITION_COL_INFO_FILE_POSTFIX = ".pci";
private boolean isPartitionCol = false;
@Override
@@ -121,43 +117,29 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
isPartitionCol = true;
col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
colValues = Lists.newLinkedList();
- DataType partitionColType = col.getType();
- if (partitionColType.isDate()) {
- dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN);
- } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
- dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
- } else if (partitionColType.isStringFamily()) {
- String partitionDateFormat = cubeDesc.getModel().getPartitionDesc().getPartitionDateFormat();
- if (StringUtils.isEmpty(partitionDateFormat)) {
- partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
- }
- dateFormat = DateFormat.getDateFormat(partitionDateFormat);
- } else {
- throw new IllegalStateException("Type " + partitionColType + " is not valid partition column type");
- }
} else {
- // col
+ // normal col
isStatistics = false;
- col = columnList.get(ReducerIdToColumnIndex.get(taskId));
+ col = columnList.get(reducerIdToColumnIndex.get(taskId));
colValues = Lists.newLinkedList();
+
+ // local build dict
+ isReducerLocalBuildDict = config.isReducerLocalBuildDict();
+ if (col != null && isReducerLocalBuildDict) {
+ builder = DictionaryGenerator.newDictionaryBuilder(col.getType());
+ builder.init(null, 0);
+ }
}
-
- //local build dict
- isReducerLocalBuildDict = config.isReducerLocalBuildDict();
- if (col != null && isReducerLocalBuildDict) {
- builder = DictionaryReducerLocalGenerator.getBuilder(col.getType());
- }
-
}
private void initReducerIdToColumnIndex(KylinConfig config) throws IOException {
int[] uhcIndex = CubeManager.getInstance(config).getUHCIndex(cubeDesc);
int count = 0;
for (int i = 0; i < uhcIndex.length; i++) {
- ReducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + i, i);
+ reducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + i, i);
if (uhcIndex[i] == 1) {
for (int j = 1; j < uhcReducerCount; j++) {
- ReducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + j + i, i);
+ reducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + j + i, i);
}
count++;
}
@@ -167,7 +149,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
@Override
public void doReduce(SelfDefineSortableKey skey, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Text key = skey.getText();
- if (isStatistics == true) {
+ if (isStatistics) {
// for hll
long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
for (Text value : values) {
@@ -187,20 +169,17 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
cuboidHLLMap.put(cuboidId, hll);
}
}
+ } else if (isPartitionCol) {
+ // partition col
+ String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
+ long time = DateFormat.stringToMillis(value);
+ timeMinValue = Math.min(timeMinValue, time);
+ timeMaxValue = Math.max(timeMaxValue, time);
} else {
+ // normal col
if (isReducerLocalBuildDict) {
- String value = new String(key.getBytes(), 1, key.getLength() - 1);
- //partition col
- try {
- if (isPartitionCol) {
- long time = dateFormat.parse(value).getTime();
- timeMinValue = Math.min(timeMinValue, time);
- timeMaxValue = Math.max(timeMaxValue, time);
- }
- builder.addValue(value);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
+ builder.addValue(value);
} else {
colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1)));
if (colValues.size() == 1000000) { //spill every 1 million
@@ -210,7 +189,6 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
}
}
}
-
}
private void outputDistinctValues(TblColRef col, Collection<ByteArray> values, Context context) throws IOException {
@@ -286,25 +264,8 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
@Override
protected void doCleanup(Context context) throws IOException, InterruptedException {
- if (isStatistics == false) {
- if (isReducerLocalBuildDict) {
- try {
- if (isPartitionCol) {
- outputPartitionInfo(context);
- }
- Dictionary<String> dict = builder.build(0);
- outputDict(col, dict, context);
- } catch (Exception e) {
- e.printStackTrace();
- }
- } else {
- if (colValues.size() > 0) {
- outputDistinctValues(col, colValues, context);
- colValues.clear();
- }
- }
- } else {
- //output the hll info;
+ if (isStatistics) {
+ // output the hll info
long grandTotal = 0;
for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) {
grandTotal += hll.getCountEstimate();
@@ -316,6 +277,20 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
writeMapperAndCuboidStatistics(context); // for human check
CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), //
cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio);
+ } else if (isPartitionCol) {
+ // partition col
+ outputPartitionInfo(context);
+ } else {
+ // normal col
+ if (isReducerLocalBuildDict) {
+ Dictionary<String> dict = builder.build();
+ outputDict(col, dict, context);
+ } else {
+ if (colValues.size() > 0) {
+ outputDistinctValues(col, colValues, context);
+ colValues.clear();
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
index 762047b..a5c8fc0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -97,15 +97,8 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
// if partition col not on cube, no need
needFetchPartitionCol = false;
} else {
- for (int x : dictionaryColumnIndex) {
- if (x == partitionColumnIndex) {
- // if partition col already build dict, no need
- needFetchPartitionCol = false;
- break;
- }
- }
+ needFetchPartitionCol = true;
}
-
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index 977196c..d3becfe 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -21,7 +21,6 @@ package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -84,8 +83,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
Path colDir = new Path(factColumnsInputPath, partitionCol.getName());
Path outputFile = new Path(colDir, partitionCol.getName() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX);
- Configuration conf = HadoopUtil.getCurrentConfiguration();
- FileSystem fs = HadoopUtil.getFileSystem(outputFile.getName());
+ FileSystem fs = HadoopUtil.getFileSystem(outputFile.toString());
FSDataInputStream is = null;
long minValue = Long.MAX_VALUE, maxValue = Long.MIN_VALUE;
try {
@@ -94,8 +92,6 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
long max = is.readLong();
minValue = Math.min(min, minValue);
maxValue = Math.max(max, maxValue);
- } catch (IOException e) {
- throw new IOException(e);
} finally {
IOUtils.closeQuietly(is);
}