You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/11/04 05:43:20 UTC
[1/9] kylin git commit: minor,
add util PrintHBaseConfig [Forced Update!]
Repository: kylin
Updated Branches:
refs/heads/KYLIN-2135 3b01b94c0 -> 0a20a9b09 (forced update)
minor, add util PrintHBaseConfig
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b80762cd
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b80762cd
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b80762cd
Branch: refs/heads/KYLIN-2135
Commit: b80762cd1dcc410179ab366391588486e1028ffa
Parents: 93e3020
Author: Li Yang <li...@apache.org>
Authored: Thu Nov 3 13:50:44 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Thu Nov 3 13:51:42 2016 +0800
----------------------------------------------------------------------
.../storage/hbase/util/PrintHBaseConfig.java | 62 ++++++++++++++++++++
1 file changed, 62 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/b80762cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java
new file mode 100644
index 0000000..634ebdf
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+
+/**
+ */
+public class PrintHBaseConfig {
+
+ public static void main(String[] args) throws IOException {
+ MyConfig config = new MyConfig(HBaseConfiguration.create());
+
+ if (args.length == 0) {
+ for (Map.Entry<Object,Object> item: config.getProps().entrySet()) {
+ System.out.println(item.getKey() + "=" + item.getValue());
+ }
+ System.exit(0);
+ }
+
+ if (args.length == 1) {
+ System.out.println(config.get(args[0]));
+ System.exit(0);
+ }
+
+ for (String arg : args) {
+ System.out.println(arg + "=" + config.get(arg));
+ }
+ System.exit(0);
+ }
+
+ private static class MyConfig extends Configuration {
+ MyConfig(Configuration other) {
+ super(other);
+ }
+
+ protected synchronized Properties getProps() {
+ return super.getProps();
+ }
+ }
+}
[7/9] kylin git commit: minor change on error messages
Posted by sh...@apache.org.
minor change on error messages
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c587b2ed
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c587b2ed
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c587b2ed
Branch: refs/heads/KYLIN-2135
Commit: c587b2ed3a968262a794e893f6af8fc109b02730
Parents: 235b123
Author: shaofengshi <sh...@apache.org>
Authored: Fri Nov 4 13:33:19 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Nov 4 13:33:19 2016 +0800
----------------------------------------------------------------------
.../org/apache/kylin/engine/mr/common/AbstractHadoopJob.java | 8 ++++----
.../apache/kylin/engine/mr/common/HadoopStatusGetter.java | 2 +-
.../main/java/org/apache/kylin/source/kafka/KafkaSource.java | 8 ++++++++
3 files changed, 13 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/c587b2ed/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 77791ce..21bb10e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -204,7 +204,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
StringUtil.appendWithSeparator(kylinDependency, filteredHive);
} else {
- logger.info("No hive dependency jars set in the environment, will find them from jvm:");
+ logger.info("No hive dependency jars set in the environment, will find them from classpath:");
try {
String hiveExecJarPath = ClassUtil.findContainingJar(Class.forName("org.apache.hadoop.hive.ql.Driver"));
@@ -227,17 +227,17 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
// for kafka dependencies
if (kylinKafkaDependency != null) {
kylinKafkaDependency = kylinKafkaDependency.replace(":", ",");
- logger.info("Kafka Dependencies Before Filtered: " + kylinKafkaDependency);
+ logger.info("Kafka Dependencies: " + kylinKafkaDependency);
StringUtil.appendWithSeparator(kylinDependency, kylinKafkaDependency);
} else {
- logger.info("No Kafka dependency jars set in the environment, will find them from jvm:");
+ logger.info("No Kafka dependency jar set in the environment, will find them from classpath:");
try {
String kafkaClientJarPath = ClassUtil.findContainingJar(Class.forName("org.apache.kafka.clients.consumer.KafkaConsumer"));
StringUtil.appendWithSeparator(kylinDependency, kafkaClientJarPath);
logger.info("kafka jar file: " + kafkaClientJarPath);
} catch (ClassNotFoundException e) {
- logger.error("Cannot found kafka dependency jars: " + e);
+ logger.warn("Not found kafka client jar from classpath, it is optional for normal build: " + e);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c587b2ed/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
index 619de90..7dcb73e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
@@ -55,7 +55,7 @@ public class HadoopStatusGetter {
private final String mrJobId;
private final String yarnUrl;
- protected static final Logger logger = LoggerFactory.getLogger(HadoopStatusChecker.class);
+ protected static final Logger logger = LoggerFactory.getLogger(HadoopStatusGetter.class);
public HadoopStatusGetter(String yarnUrl, String mrJobId) {
this.yarnUrl = yarnUrl;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c587b2ed/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index bb676e6..7a5d94f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -107,6 +107,14 @@ public class KafkaSource implements ISource {
totalEndOffset += v;
}
+ if (totalStartOffset > totalEndOffset) {
+ throw new IllegalArgumentException("Illegal offset: start: " + totalStartOffset + ", end: " + totalEndOffset);
+ }
+
+ if (totalStartOffset == totalEndOffset) {
+ throw new IllegalArgumentException("No new message comes, startOffset = endOffset:" + totalStartOffset);
+ }
+
result.setStartOffset(totalStartOffset);
result.setEndOffset(totalEndOffset);
[5/9] kylin git commit: KYLIN-1698 minor bug fix
Posted by sh...@apache.org.
KYLIN-1698 minor bug fix
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/13959553
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/13959553
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/13959553
Branch: refs/heads/KYLIN-2135
Commit: 1395955386bb505dd4ddcb5158823552ddc699e4
Parents: 5da63c2
Author: Li Yang <li...@apache.org>
Authored: Thu Nov 3 17:20:42 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Thu Nov 3 17:22:21 2016 +0800
----------------------------------------------------------------------
.../src/main/java/org/apache/kylin/common/util/DateFormat.java | 4 ++++
.../main/java/org/apache/kylin/metadata/model/PartitionDesc.java | 4 ++--
2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/13959553/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java b/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
index 2472992..1d70a2d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
@@ -138,4 +138,8 @@ public class DateFormat {
}
return false;
}
+
+ public static boolean isDatePattern(String ptn) {
+ return COMPACT_DATE_PATTERN.equals(ptn) || DEFAULT_DATE_PATTERN.equals(ptn);
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/13959553/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
index 1006b83..127d5e1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
@@ -77,7 +77,7 @@ public class PartitionDesc {
return false;
DataType type = partitionDateColumnRef.getType();
- return type.isInt();
+ return (type.isInt() || type.isBigInt()) && DateFormat.isDatePattern(partitionDateFormat);
}
public boolean partitionColumnIsTimeMillis() {
@@ -85,7 +85,7 @@ public class PartitionDesc {
return false;
DataType type = partitionDateColumnRef.getType();
- return type.isBigInt();
+ return type.isBigInt() && !DateFormat.isDatePattern(partitionDateFormat);
}
public boolean isPartitioned() {
[8/9] kylin git commit: KYLIN-2135 Enlarge FactDistinctColumns
reducer number
Posted by sh...@apache.org.
KYLIN-2135 Enlarge FactDistinctColumns reducer number
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/fbd21321
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/fbd21321
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/fbd21321
Branch: refs/heads/KYLIN-2135
Commit: fbd213218d41178263cd8affc321178dcde0b9d3
Parents: c587b2e
Author: kangkaisen <ka...@live.com>
Authored: Wed Oct 26 19:35:20 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Nov 4 13:40:48 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 5 ++
.../java/org/apache/kylin/cube/CubeManager.java | 35 ++++++++++++
.../kylin/engine/mr/DFSFileTableReader.java | 59 ++++++++++++++++----
.../kylin/engine/mr/common/BatchConstants.java | 5 ++
.../mr/steps/FactDistinctColumnPartitioner.java | 11 +---
.../engine/mr/steps/FactDistinctColumnsJob.java | 18 +++++-
.../mr/steps/FactDistinctColumnsMapperBase.java | 17 +++++-
.../mr/steps/FactDistinctColumnsReducer.java | 36 ++++++++++--
.../mr/steps/FactDistinctHiveColumnsMapper.java | 12 +++-
9 files changed, 171 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/fbd21321/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index acc4eb1..9ac8142 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -374,6 +374,11 @@ abstract public class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(getOptional("kylin.job.allow.empty.segment", "true"));
}
+ //UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns
+ public int getUHCReducerCount() {
+ return Integer.parseInt(getOptional("kylin.job.uhc.reducer.count", "3"));
+ }
+
public String getOverrideHiveTableLocation(String table) {
return getOptional("hive.table.location." + table.toUpperCase());
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/fbd21321/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 16b468f..c04617d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -31,6 +31,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -43,6 +44,7 @@ import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DictionaryDesc;
import org.apache.kylin.cube.model.DimensionDesc;
import org.apache.kylin.dict.DictionaryInfo;
import org.apache.kylin.dict.DictionaryManager;
@@ -1047,4 +1049,37 @@ public class CubeManager implements IRealizationProvider {
}
return holes;
}
+
+ private final String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder";
+
+ //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns
+ public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException {
+ List<TblColRef> factDictCols = getAllDictColumnsOnFact(cubeDesc);
+ int[] uhcIndex = new int[factDictCols.size()];
+
+ //add GlobalDictionaryColumns
+ List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries();
+ if (dictionaryDescList != null) {
+ for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
+ if (dictionaryDesc.getBuilderClass() != null && dictionaryDesc.getBuilderClass().equalsIgnoreCase(GLOBAL_DICTIONNARY_CLASS)) {
+ for (int i = 0; i < factDictCols.size(); i++) {
+ if (factDictCols.get(i).equals(dictionaryDesc.getColumnRef())) {
+ uhcIndex[i] = 1;
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ //add ShardByColumns
+ Set<TblColRef> shardByColumns = cubeDesc.getShardByColumns();
+ for (int i = 0; i < factDictCols.size(); i++) {
+ if (shardByColumns.contains(factDictCols.get(i))) {
+ uhcIndex[i] = 1;
+ }
+ }
+
+ return uhcIndex;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/fbd21321/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
index 300b123..dda1d6f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
@@ -23,10 +23,14 @@ import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
@@ -53,7 +57,7 @@ public class DFSFileTableReader implements TableReader {
private String filePath;
private String delim;
- private RowReader reader;
+ private List<RowReader> readerList;
private String curLine;
private String[] curColumns;
@@ -68,17 +72,33 @@ public class DFSFileTableReader implements TableReader {
this.filePath = filePath;
this.delim = delim;
this.expectedColumnNumber = expectedColumnNumber;
+ this.readerList = new ArrayList<RowReader>();
FileSystem fs = HadoopUtil.getFileSystem(filePath);
- try {
- this.reader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, filePath);
-
+ ArrayList<FileStatus> allFiles = new ArrayList<>();
+ FileStatus status = fs.getFileStatus(new Path(filePath));
+ if (status.isFile()) {
+ allFiles.add(status);
+ } else {
+ FileStatus[] listStatus = fs.listStatus(new Path(filePath));
+ allFiles.addAll(Arrays.asList(listStatus));
+ }
+
+ try {
+ for (FileStatus f : allFiles) {
+ RowReader rowReader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, f.getPath().toString());
+ this.readerList.add(rowReader);
+ }
} catch (IOException e) {
if (isExceptionSayingNotSeqFile(e) == false)
throw e;
- this.reader = new CsvRowReader(fs, filePath);
+ this.readerList = new ArrayList<RowReader>();
+ for (FileStatus f : allFiles) {
+ RowReader rowReader = new CsvRowReader(fs, f.getPath().toString());
+ this.readerList.add(rowReader);
+ }
}
}
@@ -94,9 +114,20 @@ public class DFSFileTableReader implements TableReader {
@Override
public boolean next() throws IOException {
- curLine = reader.nextLine();
- curColumns = null;
- return curLine != null;
+ int curReaderIndex = -1;
+ RowReader curReader;
+
+ while (++curReaderIndex < readerList.size()) {
+ curReader = readerList.get(curReaderIndex);
+ curLine = curReader.nextLine();
+ curColumns = null;
+
+ if (curLine != null) {
+ return true;
+ }
+ }
+
+ return false;
}
public String getLine() {
@@ -145,9 +176,15 @@ public class DFSFileTableReader implements TableReader {
}
@Override
- public void close() throws IOException {
- if (reader != null)
- reader.close();
+ public void close() {
+ for (RowReader reader : readerList) {
+ try {
+ if (reader != null)
+ reader.close();
+ } catch (IOException e) {
+ logger.warn("close file failed:", e);
+ }
+ }
}
private String autoDetectDelim(String line) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/fbd21321/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index e4a8808..078d80f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -81,4 +81,9 @@ public interface BatchConstants {
String MAPREDUCE_COUNTER_GROUP_NAME = "Cube Builder";
int NORMAL_RECORD_LOG_THRESHOLD = 100000;
int ERROR_RECORD_LOG_THRESHOLD = 100;
+
+ /**
+ * dictionaries builder class
+ */
+ String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder";
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/fbd21321/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
index 6973c4b..b36e422 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
@@ -18,7 +18,6 @@
package org.apache.kylin.engine.mr.steps;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.kylin.common.util.BytesUtil;
@@ -26,22 +25,16 @@ import org.apache.kylin.common.util.BytesUtil;
/**
*/
public class FactDistinctColumnPartitioner extends Partitioner<Text, Text> {
- private Configuration conf;
-
@Override
public int getPartition(Text key, Text value, int numReduceTasks) {
-
if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_HLL) {
// the last reducer is for merging hll
return numReduceTasks - 1;
} else if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_PARTITION_COL) {
- // the last reducer is for merging hll
+ // the last but one reducer is for partition col
return numReduceTasks - 2;
} else {
- int colIndex = BytesUtil.readUnsigned(key.getBytes(), 0, 1);
- return colIndex;
+ return BytesUtil.readUnsigned(key.getBytes(), 0, 1);
}
-
}
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/fbd21321/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index 6603728..27f8c19 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -78,11 +78,27 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
CubeInstance cube = cubeMgr.getCube(cubeName);
List<TblColRef> columnsNeedDict = cubeMgr.getAllDictColumnsOnFact(cube.getDescriptor());
+ int reducerCount = columnsNeedDict.size();
+ int uhcReducerCount = cube.getConfig().getUHCReducerCount();
+
+ int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor());
+ for(int index : uhcIndex) {
+ if(index == 1) {
+ reducerCount += uhcReducerCount - 1;
+ }
+ }
+
+ if (reducerCount > 255) {
+ throw new IOException("The max reducer number for FactDistinctColumnsJob is 255, please decrease the 'kylin.job.global.dictionary.column.reducer.count' ");
+ }
+
+
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled);
job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, statistics_output);
job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent);
+
logger.info("Starting: " + job.getJobName());
setJobClasspath(job, cube.getConfig());
@@ -101,7 +117,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
System.out.println("Found segment " + segment);
}
setupMapper(cube.getSegmentById(segmentID));
- setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? columnsNeedDict.size() + 2 : columnsNeedDict.size());
+ setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? reducerCount + 2 : reducerCount);
attachKylinPropsAndMetadata(cube, job.getConfiguration());
http://git-wip-us.apache.org/repos/asf/kylin/blob/fbd21321/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index 3fa966d..196bf1e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -20,7 +20,9 @@ package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
@@ -58,6 +60,10 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
protected CubeJoinedFlatTableEnrich intermediateTableDesc;
protected int[] dictionaryColumnIndex;
+ protected int uhcReducerCount;
+ protected int[] uhcIndex;
+ protected Map<Integer, Integer> columnIndexToReducerBeginId = new HashMap<>();
+
@Override
protected void setup(Context context) throws IOException {
Configuration conf = context.getConfiguration();
@@ -73,7 +79,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
- intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc);
+ intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc);
dictionaryColumnIndex = new int[factDictCols.size()];
for (int i = 0; i < factDictCols.size(); i++) {
TblColRef colRef = factDictCols.get(i);
@@ -81,6 +87,15 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
dictionaryColumnIndex[i] = columnIndexOnFlatTbl;
}
+ uhcIndex = CubeManager.getInstance(config).getUHCIndex(cubeDesc);
+ uhcReducerCount = cube.getConfig().getUHCReducerCount();
+ int count = 0;
+ for (int i = 0; i < uhcIndex.length; i++) {
+ columnIndexToReducerBeginId.put(i, count * (uhcReducerCount - 1) + i);
+ if (uhcIndex[i] == 1) {
+ count++;
+ }
+ }
}
protected void handleErrorRecord(String[] record, Exception ex) throws IOException {
http://git-wip-us.apache.org/repos/asf/kylin/blob/fbd21321/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index c8624bb..a99b857 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -67,6 +68,10 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
private boolean isStatistics = false;
private boolean isPartitionCol = false;
private KylinConfig cubeConfig;
+ private int uhcReducerCount;
+ private Map<Integer, Integer> ReducerIdToColumnIndex = new HashMap<>();
+ private int taskId;
+
protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class);
@Override
@@ -83,7 +88,10 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
boolean collectStatistics = Boolean.parseBoolean(conf.get(BatchConstants.CFG_STATISTICS_ENABLED));
int numberOfTasks = context.getNumReduceTasks();
- int taskId = context.getTaskAttemptID().getTaskID().getId();
+ taskId = context.getTaskAttemptID().getTaskID().getId();
+
+ uhcReducerCount = cube.getConfig().getUHCReducerCount();
+ initReducerIdToColumnIndex(config);
if (collectStatistics && (taskId == numberOfTasks - 1)) {
// hll
@@ -102,11 +110,25 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
// col
isStatistics = false;
isPartitionCol = false;
- col = columnList.get(taskId);
+ col = columnList.get(ReducerIdToColumnIndex.get(taskId));
colValues = Lists.newLinkedList();
}
}
+ private void initReducerIdToColumnIndex(KylinConfig config) throws IOException {
+ int[] uhcIndex = CubeManager.getInstance(config).getUHCIndex(cubeDesc);
+ int count = 0;
+ for (int i = 0; i < uhcIndex.length; i++) {
+ ReducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + i, i);
+ if (uhcIndex[i] == 1) {
+ for (int j = 1; j < uhcReducerCount; j++) {
+ ReducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + j + i, i);
+ }
+ count++;
+ }
+ }
+ }
+
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
@@ -153,10 +175,16 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
final Configuration conf = context.getConfiguration();
final FileSystem fs = FileSystem.get(conf);
final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH);
- final Path outputFile = new Path(outputPath, col.getName());
+ final Path colDir = new Path(outputPath, col.getName());
+ final String fileName = col.getName() + "-" + taskId % uhcReducerCount;
+ final Path outputFile = new Path(colDir, fileName);
FSDataOutputStream out = null;
try {
+ if (!fs.exists(colDir)) {
+ fs.mkdirs(colDir);
+ }
+
if (fs.exists(outputFile)) {
out = fs.append(outputFile);
logger.info("append file " + outputFile);
@@ -189,7 +217,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
grandTotal += hll.getCountEstimate();
}
double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
-
+
int mapperNumber = baseCuboidRowCountInMappers.size();
writeMapperAndCuboidStatistics(context); // for human check
http://git-wip-us.apache.org/repos/asf/kylin/blob/fbd21321/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
index 86ef487..e06dafb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -141,7 +141,17 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
if (fieldValue == null)
continue;
int offset = keyBuffer.position();
- keyBuffer.put(Bytes.toBytes(i)[3]); // one byte is enough
+
+ int reducerIndex;
+ if (uhcIndex[i] == 0) {
+ //for the normal dictionary column
+ reducerIndex = columnIndexToReducerBeginId.get(i);
+ } else {
+ //for the uhc
+ reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount;
+ }
+
+ keyBuffer.put(Bytes.toBytes(reducerIndex)[3]);
keyBuffer.put(Bytes.toBytes(fieldValue));
outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - offset);
context.write(outputKey, EMPTY_TEXT);
[3/9] kylin git commit: minor, fix PrintHBaseConfig checkstyle
Posted by sh...@apache.org.
minor, fix PrintHBaseConfig checkstyle
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/50059e38
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/50059e38
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/50059e38
Branch: refs/heads/KYLIN-2135
Commit: 50059e388ea59c42fb9eeea2d1d92927114bcdbd
Parents: 03f3857
Author: Li Yang <li...@apache.org>
Authored: Thu Nov 3 14:14:05 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Thu Nov 3 14:15:03 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/50059e38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java
index 634ebdf..f9b7daf 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java
@@ -33,7 +33,7 @@ public class PrintHBaseConfig {
MyConfig config = new MyConfig(HBaseConfiguration.create());
if (args.length == 0) {
- for (Map.Entry<Object,Object> item: config.getProps().entrySet()) {
+ for (Map.Entry<Object, Object> item : config.getProps().entrySet()) {
System.out.println(item.getKey() + "=" + item.getValue());
}
System.exit(0);
[2/9] kylin git commit: catch up with cleanup jobs
Posted by sh...@apache.org.
catch up with cleanup jobs
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/03f38579
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/03f38579
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/03f38579
Branch: refs/heads/KYLIN-2135
Commit: 03f385795fd11daaef637cd9be837158cebc2c9c
Parents: b80762c
Author: Hongbin Ma <ma...@apache.org>
Authored: Thu Nov 3 14:11:34 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Nov 3 14:11:44 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/tool/ExtendCubeToHybridCLI.java | 2 +-
.../org/apache/kylin/tool/MetadataCleanupJob.java | 2 +-
.../org/apache/kylin/tool/StorageCleanupJob.java | 17 +++++------------
3 files changed, 7 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/03f38579/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java b/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
index 27fa973..dbf367f 100644
--- a/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
@@ -170,7 +170,7 @@ public class ExtendCubeToHybridCLI {
CubeDesc newCubeDesc = CubeDesc.getCopyOf(cubeDesc);
newCubeDesc.setName(newCubeDescName);
newCubeDesc.updateRandomUuid();
- newCubeDesc.init(kylinConfig, metadataManager.getAllTablesMap());
+ newCubeDesc.init(kylinConfig);
newCubeDesc.setPartitionDateEnd(partitionDate);
newCubeDesc.calculateSignature();
cubeDescManager.createCubeDesc(newCubeDesc);
http://git-wip-us.apache.org/repos/asf/kylin/blob/03f38579/tool/src/main/java/org/apache/kylin/tool/MetadataCleanupJob.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/MetadataCleanupJob.java b/tool/src/main/java/org/apache/kylin/tool/MetadataCleanupJob.java
index 94962ff..7040dbb 100644
--- a/tool/src/main/java/org/apache/kylin/tool/MetadataCleanupJob.java
+++ b/tool/src/main/java/org/apache/kylin/tool/MetadataCleanupJob.java
@@ -54,7 +54,7 @@ public class MetadataCleanupJob extends AbstractHadoopJob {
private KylinConfig config = null;
- public static final long TIME_THREADSHOLD = 2 * 24 * 3600 * 1000L; // 2 days
+ public static final long TIME_THREADSHOLD = 1 * 3600 * 1000L; // 1 hour
public static final long TIME_THREADSHOLD_FOR_JOB = 30 * 24 * 3600 * 1000L; // 30 days
/*
http://git-wip-us.apache.org/repos/asf/kylin/blob/03f38579/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
index 2a2d1f3..3f82e94 100644
--- a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
+++ b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
@@ -35,7 +35,6 @@ import javax.annotation.Nullable;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
-import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -71,10 +70,10 @@ public class StorageCleanupJob extends AbstractApplication {
@SuppressWarnings("static-access")
protected static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused storage").create("delete");
- protected static final Option OPTION_FORCE = OptionBuilder.withArgName("force").hasArg().isRequired(false).withDescription("Warning: will delete any intermediate hive tables").create("force");
+ protected static final Option OPTION_FORCE = OptionBuilder.withArgName("force").hasArg().isRequired(false).withDescription("Warning: will delete all kylin intermediate hive tables").create("force");
protected static final Logger logger = LoggerFactory.getLogger(StorageCleanupJob.class);
- public static final int TIME_THRESHOLD_DELETE_HTABLE = 10; // Unit minute
+ public static final int deleteTimeout = 10; // Unit minute
protected boolean delete = false;
protected boolean force = false;
@@ -82,7 +81,6 @@ public class StorageCleanupJob extends AbstractApplication {
private void cleanUnusedHBaseTables(Configuration conf) throws IOException {
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- long TIME_THREADSHOLD = KylinConfig.getInstanceFromEnv().getStorageCleanupTimeThreshold();
// get all kylin hbase tables
HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
@@ -90,14 +88,9 @@ public class StorageCleanupJob extends AbstractApplication {
List<String> allTablesNeedToBeDropped = new ArrayList<String>();
for (HTableDescriptor desc : tableDescriptors) {
String host = desc.getValue(IRealizationConstants.HTableTag);
- String creationTime = desc.getValue(IRealizationConstants.HTableCreationTime);
if (KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix().equalsIgnoreCase(host)) {
//only take care htables that belongs to self, and created more than 2 days
- if (StringUtils.isEmpty(creationTime) || (System.currentTimeMillis() - Long.valueOf(creationTime) > TIME_THREADSHOLD)) {
- allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString());
- } else {
- logger.info("Exclude table " + desc.getTableName().getNameAsString() + " from drop list, as it is newly created");
- }
+ allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString());
}
}
@@ -119,9 +112,9 @@ public class StorageCleanupJob extends AbstractApplication {
FutureTask futureTask = new FutureTask(new DeleteHTableRunnable(hbaseAdmin, htableName));
executorService.execute(futureTask);
try {
- futureTask.get(TIME_THRESHOLD_DELETE_HTABLE, TimeUnit.MINUTES);
+ futureTask.get(deleteTimeout, TimeUnit.MINUTES);
} catch (TimeoutException e) {
- logger.warn("It fails to delete htable " + htableName + ", for it cost more than " + TIME_THRESHOLD_DELETE_HTABLE + " minutes!");
+ logger.warn("It fails to delete htable " + htableName + ", for it cost more than " + deleteTimeout + " minutes!");
futureTask.cancel(true);
} catch (Exception e) {
e.printStackTrace();
[9/9] kylin git commit: KYLIN-2135 minor format update
Posted by sh...@apache.org.
KYLIN-2135 minor format update
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0a20a9b0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0a20a9b0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0a20a9b0
Branch: refs/heads/KYLIN-2135
Commit: 0a20a9b093807dac60f758de9a3f6fc0a6d72a62
Parents: fbd2132
Author: shaofengshi <sh...@apache.org>
Authored: Thu Nov 3 18:49:50 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Nov 4 13:40:48 2016 +0800
----------------------------------------------------------------------
.../kylin/engine/mr/DFSFileTableReader.java | 92 ++++++++++----------
.../engine/mr/steps/FactDistinctColumnsJob.java | 34 ++++----
2 files changed, 61 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/0a20a9b0/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
index dda1d6f..173c908 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
@@ -23,14 +23,15 @@ import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
@@ -57,7 +58,7 @@ public class DFSFileTableReader implements TableReader {
private String filePath;
private String delim;
- private List<RowReader> readerList;
+ private List<RowReader> readerList;
private String curLine;
private String[] curColumns;
@@ -72,33 +73,33 @@ public class DFSFileTableReader implements TableReader {
this.filePath = filePath;
this.delim = delim;
this.expectedColumnNumber = expectedColumnNumber;
- this.readerList = new ArrayList<RowReader>();
+ this.readerList = new ArrayList<RowReader>();
FileSystem fs = HadoopUtil.getFileSystem(filePath);
- ArrayList<FileStatus> allFiles = new ArrayList<>();
- FileStatus status = fs.getFileStatus(new Path(filePath));
- if (status.isFile()) {
- allFiles.add(status);
- } else {
- FileStatus[] listStatus = fs.listStatus(new Path(filePath));
- allFiles.addAll(Arrays.asList(listStatus));
- }
-
- try {
- for (FileStatus f : allFiles) {
- RowReader rowReader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, f.getPath().toString());
- this.readerList.add(rowReader);
- }
+ ArrayList<FileStatus> allFiles = new ArrayList<>();
+ FileStatus status = fs.getFileStatus(new Path(filePath));
+ if (status.isFile()) {
+ allFiles.add(status);
+ } else {
+ FileStatus[] listStatus = fs.listStatus(new Path(filePath));
+ allFiles.addAll(Arrays.asList(listStatus));
+ }
+
+ try {
+ for (FileStatus f : allFiles) {
+ RowReader rowReader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, f.getPath().toString());
+ this.readerList.add(rowReader);
+ }
} catch (IOException e) {
if (isExceptionSayingNotSeqFile(e) == false)
throw e;
- this.readerList = new ArrayList<RowReader>();
- for (FileStatus f : allFiles) {
- RowReader rowReader = new CsvRowReader(fs, f.getPath().toString());
- this.readerList.add(rowReader);
- }
+ this.readerList = new ArrayList<RowReader>();
+ for (FileStatus f : allFiles) {
+ RowReader rowReader = new CsvRowReader(fs, f.getPath().toString());
+ this.readerList.add(rowReader);
+ }
}
}
@@ -114,20 +115,20 @@ public class DFSFileTableReader implements TableReader {
@Override
public boolean next() throws IOException {
- int curReaderIndex = -1;
- RowReader curReader;
-
- while (++curReaderIndex < readerList.size()) {
- curReader = readerList.get(curReaderIndex);
- curLine = curReader.nextLine();
- curColumns = null;
-
- if (curLine != null) {
- return true;
- }
- }
-
- return false;
+ int curReaderIndex = -1;
+ RowReader curReader;
+
+ while (++curReaderIndex < readerList.size()) {
+ curReader = readerList.get(curReaderIndex);
+ curLine = curReader.nextLine();
+ curColumns = null;
+
+ if (curLine != null) {
+ return true;
+ }
+ }
+
+ return false;
}
public String getLine() {
@@ -176,15 +177,10 @@ public class DFSFileTableReader implements TableReader {
}
@Override
- public void close() {
- for (RowReader reader : readerList) {
- try {
- if (reader != null)
- reader.close();
- } catch (IOException e) {
- logger.warn("close file failed:", e);
- }
- }
+ public void close() {
+ for (RowReader reader : readerList) {
+ IOUtils.closeQuietly(reader);
+ }
}
private String autoDetectDelim(String line) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/0a20a9b0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index 27f8c19..c34ff94 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -78,27 +78,27 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
CubeInstance cube = cubeMgr.getCube(cubeName);
List<TblColRef> columnsNeedDict = cubeMgr.getAllDictColumnsOnFact(cube.getDescriptor());
- int reducerCount = columnsNeedDict.size();
- int uhcReducerCount = cube.getConfig().getUHCReducerCount();
-
- int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor());
- for(int index : uhcIndex) {
- if(index == 1) {
- reducerCount += uhcReducerCount - 1;
- }
- }
-
- if (reducerCount > 255) {
- throw new IOException("The max reducer number for FactDistinctColumnsJob is 255, please decrease the 'kylin.job.global.dictionary.column.reducer.count' ");
- }
-
-
+ int reducerCount = columnsNeedDict.size();
+ int uhcReducerCount = cube.getConfig().getUHCReducerCount();
+
+ int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor());
+ for(int index : uhcIndex) {
+ if(index == 1) {
+ reducerCount += uhcReducerCount - 1;
+ }
+ }
+
+ if (reducerCount > 255) {
+ throw new IllegalArgumentException("The max reducer number for FactDistinctColumnsJob is 255, but now it is " + reducerCount + ", decrease 'kylin.job.uhc.reducer.count'");
+ }
+
+
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled);
job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, statistics_output);
job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent);
-
+
logger.info("Starting: " + job.getJobName());
setJobClasspath(job, cube.getConfig());
@@ -117,7 +117,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
System.out.println("Found segment " + segment);
}
setupMapper(cube.getSegmentById(segmentID));
- setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? reducerCount + 2 : reducerCount);
+ setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? reducerCount + 2 : reducerCount);
attachKylinPropsAndMetadata(cube, job.getConfiguration());
[4/9] kylin git commit: minor, update default engines in cloned cube
Posted by sh...@apache.org.
minor, update default engines in cloned cube
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5da63c23
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5da63c23
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5da63c23
Branch: refs/heads/KYLIN-2135
Commit: 5da63c233fdee07b11d8e1e7886b50eba0b56f31
Parents: 50059e3
Author: Li Yang <li...@apache.org>
Authored: Thu Nov 3 16:39:27 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Thu Nov 3 16:42:51 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/rest/controller/CubeController.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/5da63c23/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index c70b506..10cd1f2 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -29,6 +29,7 @@ import java.util.UUID;
import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -347,7 +348,11 @@ public class CubeController extends BasicController {
CubeDesc cubeDesc = cube.getDescriptor();
CubeDesc newCubeDesc = CubeDesc.getCopyOf(cubeDesc);
+
+ KylinConfig config = cubeService.getConfig();
newCubeDesc.setName(newCubeName);
+ newCubeDesc.setEngineType(config.getDefaultCubeEngine());
+ newCubeDesc.setStorageType(config.getDefaultStorageEngine());
CubeInstance newCube;
try {
[6/9] kylin git commit: minor, clearer log on 'Value not exist!' error
Posted by sh...@apache.org.
minor, clearer log on 'Value not exist!' error
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/235b123d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/235b123d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/235b123d
Branch: refs/heads/KYLIN-2135
Commit: 235b123d09c629b685cbaf029f205bbeab887d6f
Parents: 1395955
Author: Li Yang <li...@apache.org>
Authored: Thu Nov 3 18:21:55 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Thu Nov 3 18:23:01 2016 +0800
----------------------------------------------------------------------
.../main/java/org/apache/kylin/common/util/Dictionary.java | 2 +-
.../java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java | 8 +++++++-
2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/235b123d/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java b/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java
index 86ad5ff..0fb299c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java
@@ -159,7 +159,7 @@ abstract public class Dictionary<T> implements Serializable {
else {
int id = getIdFromValueBytesImpl(value, offset, len, roundingFlag);
if (id < 0)
- throw new IllegalArgumentException("Value not exists!");
+ throw new IllegalArgumentException("Value '" + Bytes.toString(value, offset, len) + "' (" + Bytes.toStringBinary(value, offset, len) + ") not exists!");
return id;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/235b123d/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
index f95cc21..aeeb893 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
@@ -118,7 +118,13 @@ public class CubeCodeSystem implements IGTCodeSystem {
if (dictEnc.getRoundingFlag() != roundingFlag) {
serializer = dictEnc.copy(roundingFlag).asDataTypeSerializer();
}
- serializer.serialize(value, buf);
+ try {
+ serializer.serialize(value, buf);
+ } catch (IllegalArgumentException ex) {
+ IllegalArgumentException rewordEx = new IllegalArgumentException("Column " + col + " value '" + value + "' met dictionary error: " + ex.getMessage());
+ rewordEx.setStackTrace(ex.getStackTrace());
+ throw rewordEx;
+ }
} else {
if (value instanceof String) {
// for dimensions; measures are converted by MeasureIngestor before reaching this point