You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/11/04 05:43:20 UTC

[1/9] kylin git commit: minor, add util PrintHBaseConfig [Forced Update!]

Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2135 3b01b94c0 -> 0a20a9b09 (forced update)


minor, add util PrintHBaseConfig


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b80762cd
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b80762cd
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b80762cd

Branch: refs/heads/KYLIN-2135
Commit: b80762cd1dcc410179ab366391588486e1028ffa
Parents: 93e3020
Author: Li Yang <li...@apache.org>
Authored: Thu Nov 3 13:50:44 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Thu Nov 3 13:51:42 2016 +0800

----------------------------------------------------------------------
 .../storage/hbase/util/PrintHBaseConfig.java    | 62 ++++++++++++++++++++
 1 file changed, 62 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/b80762cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java
new file mode 100644
index 0000000..634ebdf
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+
+/**
+ */
+public class PrintHBaseConfig {
+
+    public static void main(String[] args) throws IOException {
+        MyConfig config = new MyConfig(HBaseConfiguration.create());
+        
+        if (args.length == 0) {
+            for (Map.Entry<Object,Object> item: config.getProps().entrySet()) {
+                System.out.println(item.getKey() + "=" + item.getValue());
+            }
+            System.exit(0);
+        }
+
+        if (args.length == 1) {
+            System.out.println(config.get(args[0]));
+            System.exit(0);
+        }
+        
+        for (String arg : args) {
+            System.out.println(arg + "=" + config.get(arg));
+        }
+        System.exit(0);
+    }
+    
+    private static class MyConfig extends Configuration {
+        MyConfig(Configuration other) {
+            super(other);
+        }
+        
+        protected synchronized Properties getProps() {
+            return super.getProps();
+        }
+    }
+}


[7/9] kylin git commit: minor change on error messages

Posted by sh...@apache.org.
minor change on error messages

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c587b2ed
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c587b2ed
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c587b2ed

Branch: refs/heads/KYLIN-2135
Commit: c587b2ed3a968262a794e893f6af8fc109b02730
Parents: 235b123
Author: shaofengshi <sh...@apache.org>
Authored: Fri Nov 4 13:33:19 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Nov 4 13:33:19 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/engine/mr/common/AbstractHadoopJob.java | 8 ++++----
 .../apache/kylin/engine/mr/common/HadoopStatusGetter.java    | 2 +-
 .../main/java/org/apache/kylin/source/kafka/KafkaSource.java | 8 ++++++++
 3 files changed, 13 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/c587b2ed/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 77791ce..21bb10e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -204,7 +204,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
             StringUtil.appendWithSeparator(kylinDependency, filteredHive);
         } else {
 
-            logger.info("No hive dependency jars set in the environment, will find them from jvm:");
+            logger.info("No hive dependency jars set in the environment, will find them from classpath:");
 
             try {
                 String hiveExecJarPath = ClassUtil.findContainingJar(Class.forName("org.apache.hadoop.hive.ql.Driver"));
@@ -227,17 +227,17 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         // for kafka dependencies
         if (kylinKafkaDependency != null) {
             kylinKafkaDependency = kylinKafkaDependency.replace(":", ",");
-            logger.info("Kafka Dependencies Before Filtered: " + kylinKafkaDependency);
+            logger.info("Kafka Dependencies: " + kylinKafkaDependency);
             StringUtil.appendWithSeparator(kylinDependency, kylinKafkaDependency);
         } else {
-            logger.info("No Kafka dependency jars set in the environment, will find them from jvm:");
+            logger.info("No Kafka dependency jar set in the environment, will find them from classpath:");
             try {
                 String kafkaClientJarPath = ClassUtil.findContainingJar(Class.forName("org.apache.kafka.clients.consumer.KafkaConsumer"));
                 StringUtil.appendWithSeparator(kylinDependency, kafkaClientJarPath);
                 logger.info("kafka jar file: " + kafkaClientJarPath);
 
             } catch (ClassNotFoundException e) {
-                logger.error("Cannot found kafka dependency jars: " + e);
+                logger.warn("Not found kafka client jar from classpath, it is optional for normal build: " + e);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/c587b2ed/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
index 619de90..7dcb73e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
@@ -55,7 +55,7 @@ public class HadoopStatusGetter {
     private final String mrJobId;
     private final String yarnUrl;
 
-    protected static final Logger logger = LoggerFactory.getLogger(HadoopStatusChecker.class);
+    protected static final Logger logger = LoggerFactory.getLogger(HadoopStatusGetter.class);
 
     public HadoopStatusGetter(String yarnUrl, String mrJobId) {
         this.yarnUrl = yarnUrl;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c587b2ed/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index bb676e6..7a5d94f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -107,6 +107,14 @@ public class KafkaSource implements ISource {
             totalEndOffset += v;
         }
 
+        if (totalStartOffset > totalEndOffset) {
+            throw new IllegalArgumentException("Illegal offset: start: " + totalStartOffset + ", end: " + totalEndOffset);
+        }
+
+        if (totalStartOffset == totalEndOffset) {
+            throw new IllegalArgumentException("No new message comes, startOffset = endOffset:" + totalStartOffset);
+        }
+
         result.setStartOffset(totalStartOffset);
         result.setEndOffset(totalEndOffset);
 


[5/9] kylin git commit: KYLIN-1698 minor bug fix

Posted by sh...@apache.org.
KYLIN-1698 minor bug fix


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/13959553
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/13959553
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/13959553

Branch: refs/heads/KYLIN-2135
Commit: 1395955386bb505dd4ddcb5158823552ddc699e4
Parents: 5da63c2
Author: Li Yang <li...@apache.org>
Authored: Thu Nov 3 17:20:42 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Thu Nov 3 17:22:21 2016 +0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/kylin/common/util/DateFormat.java   | 4 ++++
 .../main/java/org/apache/kylin/metadata/model/PartitionDesc.java | 4 ++--
 2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/13959553/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java b/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
index 2472992..1d70a2d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
@@ -138,4 +138,8 @@ public class DateFormat {
         }
         return false;
     }
+
+    public static boolean isDatePattern(String ptn) {
+        return COMPACT_DATE_PATTERN.equals(ptn) || DEFAULT_DATE_PATTERN.equals(ptn);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/13959553/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
index 1006b83..127d5e1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
@@ -77,7 +77,7 @@ public class PartitionDesc {
             return false;
         
         DataType type = partitionDateColumnRef.getType();
-        return type.isInt();
+        return (type.isInt() || type.isBigInt()) && DateFormat.isDatePattern(partitionDateFormat);
     }
 
     public boolean partitionColumnIsTimeMillis() {
@@ -85,7 +85,7 @@ public class PartitionDesc {
             return false;
         
         DataType type = partitionDateColumnRef.getType();
-        return type.isBigInt();
+        return type.isBigInt() && !DateFormat.isDatePattern(partitionDateFormat);
     }
 
     public boolean isPartitioned() {


[8/9] kylin git commit: KYLIN-2135 Enlarge FactDistinctColumns reducer number

Posted by sh...@apache.org.
KYLIN-2135 Enlarge FactDistinctColumns reducer number

Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/fbd21321
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/fbd21321
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/fbd21321

Branch: refs/heads/KYLIN-2135
Commit: fbd213218d41178263cd8affc321178dcde0b9d3
Parents: c587b2e
Author: kangkaisen <ka...@live.com>
Authored: Wed Oct 26 19:35:20 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Nov 4 13:40:48 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  5 ++
 .../java/org/apache/kylin/cube/CubeManager.java | 35 ++++++++++++
 .../kylin/engine/mr/DFSFileTableReader.java     | 59 ++++++++++++++++----
 .../kylin/engine/mr/common/BatchConstants.java  |  5 ++
 .../mr/steps/FactDistinctColumnPartitioner.java | 11 +---
 .../engine/mr/steps/FactDistinctColumnsJob.java | 18 +++++-
 .../mr/steps/FactDistinctColumnsMapperBase.java | 17 +++++-
 .../mr/steps/FactDistinctColumnsReducer.java    | 36 ++++++++++--
 .../mr/steps/FactDistinctHiveColumnsMapper.java | 12 +++-
 9 files changed, 171 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/fbd21321/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index acc4eb1..9ac8142 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -374,6 +374,11 @@ abstract public class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.job.allow.empty.segment", "true"));
     }
 
+    //UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns
+    public int getUHCReducerCount() {
+        return Integer.parseInt(getOptional("kylin.job.uhc.reducer.count", "3"));
+    }
+
     public String getOverrideHiveTableLocation(String table) {
         return getOptional("hive.table.location." + table.toUpperCase());
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/fbd21321/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 16b468f..c04617d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -31,6 +31,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -43,6 +44,7 @@ import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DictionaryDesc;
 import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
@@ -1047,4 +1049,37 @@ public class CubeManager implements IRealizationProvider {
         }
         return holes;
     }
+
+    private final String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder";
+
+    //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns
+    public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException {
+        List<TblColRef> factDictCols = getAllDictColumnsOnFact(cubeDesc);
+        int[] uhcIndex = new int[factDictCols.size()];
+
+        //add GlobalDictionaryColumns
+        List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries();
+        if (dictionaryDescList != null) {
+            for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
+                if (dictionaryDesc.getBuilderClass() != null && dictionaryDesc.getBuilderClass().equalsIgnoreCase(GLOBAL_DICTIONNARY_CLASS)) {
+                    for (int i = 0; i < factDictCols.size(); i++) {
+                        if (factDictCols.get(i).equals(dictionaryDesc.getColumnRef())) {
+                            uhcIndex[i] = 1;
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+
+        //add ShardByColumns
+        Set<TblColRef> shardByColumns = cubeDesc.getShardByColumns();
+        for (int i = 0; i < factDictCols.size(); i++) {
+            if (shardByColumns.contains(factDictCols.get(i))) {
+                uhcIndex[i] = 1;
+            }
+        }
+
+        return uhcIndex;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/fbd21321/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
index 300b123..dda1d6f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
@@ -23,10 +23,14 @@ import java.io.Closeable;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
@@ -53,7 +57,7 @@ public class DFSFileTableReader implements TableReader {
 
     private String filePath;
     private String delim;
-    private RowReader reader;
+    private List<RowReader> readerList;
 
     private String curLine;
     private String[] curColumns;
@@ -68,17 +72,33 @@ public class DFSFileTableReader implements TableReader {
         this.filePath = filePath;
         this.delim = delim;
         this.expectedColumnNumber = expectedColumnNumber;
+        this.readerList = new ArrayList<RowReader>();
 
         FileSystem fs = HadoopUtil.getFileSystem(filePath);
 
-        try {
-            this.reader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, filePath);
-
+        ArrayList<FileStatus> allFiles = new ArrayList<>();
+        FileStatus status = fs.getFileStatus(new Path(filePath));
+        if (status.isFile()) {
+            allFiles.add(status);
+        } else {
+            FileStatus[] listStatus = fs.listStatus(new Path(filePath));
+            allFiles.addAll(Arrays.asList(listStatus));
+        }
+
+        try {
+            for (FileStatus f : allFiles) {
+                RowReader rowReader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, f.getPath().toString());
+                this.readerList.add(rowReader);
+            }
         } catch (IOException e) {
             if (isExceptionSayingNotSeqFile(e) == false)
                 throw e;
 
-            this.reader = new CsvRowReader(fs, filePath);
+            this.readerList = new ArrayList<RowReader>();
+            for (FileStatus f : allFiles) {
+                RowReader rowReader = new CsvRowReader(fs, f.getPath().toString());
+                this.readerList.add(rowReader);
+            }
         }
     }
 
@@ -94,9 +114,20 @@ public class DFSFileTableReader implements TableReader {
 
     @Override
     public boolean next() throws IOException {
-        curLine = reader.nextLine();
-        curColumns = null;
-        return curLine != null;
+        int curReaderIndex = -1;
+        RowReader curReader;
+
+        while (++curReaderIndex < readerList.size()) {
+            curReader = readerList.get(curReaderIndex);
+            curLine = curReader.nextLine();
+            curColumns = null;
+
+            if (curLine != null) {
+                return true;
+            }
+        }
+
+        return false;
     }
 
     public String getLine() {
@@ -145,9 +176,15 @@ public class DFSFileTableReader implements TableReader {
     }
 
     @Override
-    public void close() throws IOException {
-        if (reader != null)
-            reader.close();
+    public void close() {
+        for (RowReader reader : readerList) {
+            try {
+                if (reader != null)
+                    reader.close();
+            } catch (IOException e) {
+                logger.warn("close file failed:", e);
+            }
+        }
     }
 
     private String autoDetectDelim(String line) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/fbd21321/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index e4a8808..078d80f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -81,4 +81,9 @@ public interface BatchConstants {
     String MAPREDUCE_COUNTER_GROUP_NAME = "Cube Builder";
     int NORMAL_RECORD_LOG_THRESHOLD = 100000;
     int ERROR_RECORD_LOG_THRESHOLD = 100;
+
+    /**
+     * dictionaries builder class
+     */
+    String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder";
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/fbd21321/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
index 6973c4b..b36e422 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
@@ -18,7 +18,6 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.kylin.common.util.BytesUtil;
@@ -26,22 +25,16 @@ import org.apache.kylin.common.util.BytesUtil;
 /**
  */
 public class FactDistinctColumnPartitioner extends Partitioner<Text, Text> {
-    private Configuration conf;
-
     @Override
     public int getPartition(Text key, Text value, int numReduceTasks) {
-
         if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_HLL) {
             // the last reducer is for merging hll
             return numReduceTasks - 1;
         } else if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_PARTITION_COL) {
-            // the last reducer is for merging hll
+            // the last but one reducer is for partition col
             return numReduceTasks - 2;
         } else {
-            int colIndex = BytesUtil.readUnsigned(key.getBytes(), 0, 1);
-            return colIndex;
+            return BytesUtil.readUnsigned(key.getBytes(), 0, 1);
         }
-
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/fbd21321/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index 6603728..27f8c19 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -78,11 +78,27 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             CubeInstance cube = cubeMgr.getCube(cubeName);
             List<TblColRef> columnsNeedDict = cubeMgr.getAllDictColumnsOnFact(cube.getDescriptor());
 
+            int reducerCount = columnsNeedDict.size();
+            int uhcReducerCount = cube.getConfig().getUHCReducerCount();
+
+            int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor());
+            for(int index : uhcIndex) {
+                if(index == 1) {
+                    reducerCount += uhcReducerCount - 1;
+                }
+            }
+
+            if (reducerCount > 255) {
+                throw new IOException("The max reducer number for FactDistinctColumnsJob is 255, please decrease the 'kylin.job.global.dictionary.column.reducer.count' ");
+            }
+
+
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, statistics_output);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent);
+
             logger.info("Starting: " + job.getJobName());
 
             setJobClasspath(job, cube.getConfig());
@@ -101,7 +117,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
                 System.out.println("Found segment " + segment);
             }
             setupMapper(cube.getSegmentById(segmentID));
-            setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? columnsNeedDict.size() + 2 : columnsNeedDict.size());
+            setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? reducerCount + 2 : reducerCount);
 
             attachKylinPropsAndMetadata(cube, job.getConfiguration());
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/fbd21321/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index 3fa966d..196bf1e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -20,7 +20,9 @@ package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
@@ -58,6 +60,10 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
     protected CubeJoinedFlatTableEnrich intermediateTableDesc;
     protected int[] dictionaryColumnIndex;
 
+    protected int uhcReducerCount;
+    protected int[] uhcIndex;
+    protected Map<Integer, Integer> columnIndexToReducerBeginId = new HashMap<>();
+
     @Override
     protected void setup(Context context) throws IOException {
         Configuration conf = context.getConfiguration();
@@ -73,7 +79,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
 
         flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
 
-        intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg),  cubeDesc);
+        intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc);
         dictionaryColumnIndex = new int[factDictCols.size()];
         for (int i = 0; i < factDictCols.size(); i++) {
             TblColRef colRef = factDictCols.get(i);
@@ -81,6 +87,15 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
             dictionaryColumnIndex[i] = columnIndexOnFlatTbl;
         }
 
+        uhcIndex = CubeManager.getInstance(config).getUHCIndex(cubeDesc);
+        uhcReducerCount = cube.getConfig().getUHCReducerCount();
+        int count = 0;
+        for (int i = 0; i < uhcIndex.length; i++) {
+            columnIndexToReducerBeginId.put(i, count * (uhcReducerCount - 1) + i);
+            if (uhcIndex[i] == 1) {
+                count++;
+            }
+        }
     }
 
     protected void handleErrorRecord(String[] record, Exception ex) throws IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/fbd21321/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index c8624bb..a99b857 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -67,6 +68,10 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
     private boolean isStatistics = false;
     private boolean isPartitionCol = false;
     private KylinConfig cubeConfig;
+    private int uhcReducerCount;
+    private Map<Integer, Integer> ReducerIdToColumnIndex = new HashMap<>();
+    private int taskId;
+
     protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class);
 
     @Override
@@ -83,7 +88,10 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
 
         boolean collectStatistics = Boolean.parseBoolean(conf.get(BatchConstants.CFG_STATISTICS_ENABLED));
         int numberOfTasks = context.getNumReduceTasks();
-        int taskId = context.getTaskAttemptID().getTaskID().getId();
+        taskId = context.getTaskAttemptID().getTaskID().getId();
+
+        uhcReducerCount = cube.getConfig().getUHCReducerCount();
+        initReducerIdToColumnIndex(config);
 
         if (collectStatistics && (taskId == numberOfTasks - 1)) {
             // hll
@@ -102,11 +110,25 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
             // col
             isStatistics = false;
             isPartitionCol = false;
-            col = columnList.get(taskId);
+            col = columnList.get(ReducerIdToColumnIndex.get(taskId));
             colValues = Lists.newLinkedList();
         }
     }
 
+    private void initReducerIdToColumnIndex(KylinConfig config) throws IOException {
+        int[] uhcIndex = CubeManager.getInstance(config).getUHCIndex(cubeDesc);
+        int count = 0;
+        for (int i = 0; i < uhcIndex.length; i++) {
+            ReducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + i, i);
+            if (uhcIndex[i] == 1) {
+                for (int j = 1; j < uhcReducerCount; j++) {
+                    ReducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + j + i, i);
+                }
+                count++;
+            }
+        }
+    }
+
     @Override
     public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 
@@ -153,10 +175,16 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
         final Configuration conf = context.getConfiguration();
         final FileSystem fs = FileSystem.get(conf);
         final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH);
-        final Path outputFile = new Path(outputPath, col.getName());
+        final Path colDir = new Path(outputPath, col.getName());
+        final String fileName = col.getName() + "-" + taskId % uhcReducerCount;
+        final Path outputFile = new Path(colDir, fileName);
 
         FSDataOutputStream out = null;
         try {
+            if (!fs.exists(colDir)) {
+                fs.mkdirs(colDir);
+            }
+
             if (fs.exists(outputFile)) {
                 out = fs.append(outputFile);
                 logger.info("append file " + outputFile);
@@ -189,7 +217,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
                 grandTotal += hll.getCountEstimate();
             }
             double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
-            
+
             int mapperNumber = baseCuboidRowCountInMappers.size();
 
             writeMapperAndCuboidStatistics(context); // for human check

http://git-wip-us.apache.org/repos/asf/kylin/blob/fbd21321/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
index 86ef487..e06dafb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -141,7 +141,17 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
                 if (fieldValue == null)
                     continue;
                 int offset = keyBuffer.position();
-                keyBuffer.put(Bytes.toBytes(i)[3]); // one byte is enough
+
+                int reducerIndex;
+                if (uhcIndex[i] == 0) {
+                    //for the normal dictionary column
+                    reducerIndex = columnIndexToReducerBeginId.get(i);
+                } else {
+                    //for the uhc
+                    reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount;
+                }
+
+                keyBuffer.put(Bytes.toBytes(reducerIndex)[3]);
                 keyBuffer.put(Bytes.toBytes(fieldValue));
                 outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - offset);
                 context.write(outputKey, EMPTY_TEXT);


[3/9] kylin git commit: minor, fix PrintHBaseConfig checkstyle

Posted by sh...@apache.org.
minor, fix PrintHBaseConfig checkstyle


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/50059e38
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/50059e38
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/50059e38

Branch: refs/heads/KYLIN-2135
Commit: 50059e388ea59c42fb9eeea2d1d92927114bcdbd
Parents: 03f3857
Author: Li Yang <li...@apache.org>
Authored: Thu Nov 3 14:14:05 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Thu Nov 3 14:15:03 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/50059e38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java
index 634ebdf..f9b7daf 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PrintHBaseConfig.java
@@ -33,7 +33,7 @@ public class PrintHBaseConfig {
         MyConfig config = new MyConfig(HBaseConfiguration.create());
         
         if (args.length == 0) {
-            for (Map.Entry<Object,Object> item: config.getProps().entrySet()) {
+            for (Map.Entry<Object, Object> item : config.getProps().entrySet()) {
                 System.out.println(item.getKey() + "=" + item.getValue());
             }
             System.exit(0);


[2/9] kylin git commit: catch up with cleanup jobs

Posted by sh...@apache.org.
catch up with cleanup jobs


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/03f38579
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/03f38579
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/03f38579

Branch: refs/heads/KYLIN-2135
Commit: 03f385795fd11daaef637cd9be837158cebc2c9c
Parents: b80762c
Author: Hongbin Ma <ma...@apache.org>
Authored: Thu Nov 3 14:11:34 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Nov 3 14:11:44 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/tool/ExtendCubeToHybridCLI.java   |  2 +-
 .../org/apache/kylin/tool/MetadataCleanupJob.java  |  2 +-
 .../org/apache/kylin/tool/StorageCleanupJob.java   | 17 +++++------------
 3 files changed, 7 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/03f38579/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java b/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
index 27fa973..dbf367f 100644
--- a/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
@@ -170,7 +170,7 @@ public class ExtendCubeToHybridCLI {
         CubeDesc newCubeDesc = CubeDesc.getCopyOf(cubeDesc);
         newCubeDesc.setName(newCubeDescName);
         newCubeDesc.updateRandomUuid();
-        newCubeDesc.init(kylinConfig, metadataManager.getAllTablesMap());
+        newCubeDesc.init(kylinConfig);
         newCubeDesc.setPartitionDateEnd(partitionDate);
         newCubeDesc.calculateSignature();
         cubeDescManager.createCubeDesc(newCubeDesc);

http://git-wip-us.apache.org/repos/asf/kylin/blob/03f38579/tool/src/main/java/org/apache/kylin/tool/MetadataCleanupJob.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/MetadataCleanupJob.java b/tool/src/main/java/org/apache/kylin/tool/MetadataCleanupJob.java
index 94962ff..7040dbb 100644
--- a/tool/src/main/java/org/apache/kylin/tool/MetadataCleanupJob.java
+++ b/tool/src/main/java/org/apache/kylin/tool/MetadataCleanupJob.java
@@ -54,7 +54,7 @@ public class MetadataCleanupJob extends AbstractHadoopJob {
 
     private KylinConfig config = null;
 
-    public static final long TIME_THREADSHOLD = 2 * 24 * 3600 * 1000L; // 2 days
+    public static final long TIME_THREADSHOLD = 1 * 3600 * 1000L; // 1 hour
     public static final long TIME_THREADSHOLD_FOR_JOB = 30 * 24 * 3600 * 1000L; // 30 days
 
     /*

http://git-wip-us.apache.org/repos/asf/kylin/blob/03f38579/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
index 2a2d1f3..3f82e94 100644
--- a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
+++ b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
@@ -35,7 +35,6 @@ import javax.annotation.Nullable;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -71,10 +70,10 @@ public class StorageCleanupJob extends AbstractApplication {
 
     @SuppressWarnings("static-access")
     protected static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused storage").create("delete");
-    protected static final Option OPTION_FORCE = OptionBuilder.withArgName("force").hasArg().isRequired(false).withDescription("Warning: will delete any intermediate hive tables").create("force");
+    protected static final Option OPTION_FORCE = OptionBuilder.withArgName("force").hasArg().isRequired(false).withDescription("Warning: will delete all kylin intermediate hive tables").create("force");
 
     protected static final Logger logger = LoggerFactory.getLogger(StorageCleanupJob.class);
-    public static final int TIME_THRESHOLD_DELETE_HTABLE = 10; // Unit minute
+    public static final int deleteTimeout = 10; // Unit minute
 
     protected boolean delete = false;
     protected boolean force = false;
@@ -82,7 +81,6 @@ public class StorageCleanupJob extends AbstractApplication {
 
     private void cleanUnusedHBaseTables(Configuration conf) throws IOException {
         CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-        long TIME_THREADSHOLD = KylinConfig.getInstanceFromEnv().getStorageCleanupTimeThreshold();
         // get all kylin hbase tables
         HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
         String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
@@ -90,14 +88,9 @@ public class StorageCleanupJob extends AbstractApplication {
         List<String> allTablesNeedToBeDropped = new ArrayList<String>();
         for (HTableDescriptor desc : tableDescriptors) {
             String host = desc.getValue(IRealizationConstants.HTableTag);
-            String creationTime = desc.getValue(IRealizationConstants.HTableCreationTime);
             if (KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix().equalsIgnoreCase(host)) {
                 //only take care htables that belongs to self, and created more than 2 days
-                if (StringUtils.isEmpty(creationTime) || (System.currentTimeMillis() - Long.valueOf(creationTime) > TIME_THREADSHOLD)) {
-                    allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString());
-                } else {
-                    logger.info("Exclude table " + desc.getTableName().getNameAsString() + " from drop list, as it is newly created");
-                }
+                allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString());
             }
         }
 
@@ -119,9 +112,9 @@ public class StorageCleanupJob extends AbstractApplication {
                 FutureTask futureTask = new FutureTask(new DeleteHTableRunnable(hbaseAdmin, htableName));
                 executorService.execute(futureTask);
                 try {
-                    futureTask.get(TIME_THRESHOLD_DELETE_HTABLE, TimeUnit.MINUTES);
+                    futureTask.get(deleteTimeout, TimeUnit.MINUTES);
                 } catch (TimeoutException e) {
-                    logger.warn("It fails to delete htable " + htableName + ", for it cost more than " + TIME_THRESHOLD_DELETE_HTABLE + " minutes!");
+                    logger.warn("It fails to delete htable " + htableName + ", for it cost more than " + deleteTimeout + " minutes!");
                     futureTask.cancel(true);
                 } catch (Exception e) {
                     e.printStackTrace();


[9/9] kylin git commit: KYLIN-2135 minor format update

Posted by sh...@apache.org.
KYLIN-2135 minor format update

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0a20a9b0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0a20a9b0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0a20a9b0

Branch: refs/heads/KYLIN-2135
Commit: 0a20a9b093807dac60f758de9a3f6fc0a6d72a62
Parents: fbd2132
Author: shaofengshi <sh...@apache.org>
Authored: Thu Nov 3 18:49:50 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Nov 4 13:40:48 2016 +0800

----------------------------------------------------------------------
 .../kylin/engine/mr/DFSFileTableReader.java     | 92 ++++++++++----------
 .../engine/mr/steps/FactDistinctColumnsJob.java | 34 ++++----
 2 files changed, 61 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/0a20a9b0/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
index dda1d6f..173c908 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
@@ -23,14 +23,15 @@ import java.io.Closeable;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
@@ -57,7 +58,7 @@ public class DFSFileTableReader implements TableReader {
 
     private String filePath;
     private String delim;
-    private List<RowReader> readerList;
+    private List<RowReader> readerList;
 
     private String curLine;
     private String[] curColumns;
@@ -72,33 +73,33 @@ public class DFSFileTableReader implements TableReader {
         this.filePath = filePath;
         this.delim = delim;
         this.expectedColumnNumber = expectedColumnNumber;
-        this.readerList = new ArrayList<RowReader>();
+        this.readerList = new ArrayList<RowReader>();
 
         FileSystem fs = HadoopUtil.getFileSystem(filePath);
 
-        ArrayList<FileStatus> allFiles = new ArrayList<>();
-        FileStatus status = fs.getFileStatus(new Path(filePath));
-        if (status.isFile()) {
-            allFiles.add(status);
-        } else {
-            FileStatus[] listStatus = fs.listStatus(new Path(filePath));
-            allFiles.addAll(Arrays.asList(listStatus));
-        }
-
-        try {
-            for (FileStatus f : allFiles) {
-                RowReader rowReader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, f.getPath().toString());
-                this.readerList.add(rowReader);
-            }
+        ArrayList<FileStatus> allFiles = new ArrayList<>();
+        FileStatus status = fs.getFileStatus(new Path(filePath));
+        if (status.isFile()) {
+            allFiles.add(status);
+        } else {
+            FileStatus[] listStatus = fs.listStatus(new Path(filePath));
+            allFiles.addAll(Arrays.asList(listStatus));
+        }
+
+        try {
+            for (FileStatus f : allFiles) {
+                RowReader rowReader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, f.getPath().toString());
+                this.readerList.add(rowReader);
+            }
         } catch (IOException e) {
             if (isExceptionSayingNotSeqFile(e) == false)
                 throw e;
 
-            this.readerList = new ArrayList<RowReader>();
-            for (FileStatus f : allFiles) {
-                RowReader rowReader = new CsvRowReader(fs, f.getPath().toString());
-                this.readerList.add(rowReader);
-            }
+            this.readerList = new ArrayList<RowReader>();
+            for (FileStatus f : allFiles) {
+                RowReader rowReader = new CsvRowReader(fs, f.getPath().toString());
+                this.readerList.add(rowReader);
+            }
         }
     }
 
@@ -114,20 +115,20 @@ public class DFSFileTableReader implements TableReader {
 
     @Override
     public boolean next() throws IOException {
-        int curReaderIndex = -1;
-        RowReader curReader;
-
-        while (++curReaderIndex < readerList.size()) {
-            curReader = readerList.get(curReaderIndex);
-            curLine = curReader.nextLine();
-            curColumns = null;
-
-            if (curLine != null) {
-                return true;
-            }
-        }
-
-        return false;
+        int curReaderIndex = -1;
+        RowReader curReader;
+
+        while (++curReaderIndex < readerList.size()) {
+            curReader = readerList.get(curReaderIndex);
+            curLine = curReader.nextLine();
+            curColumns = null;
+
+            if (curLine != null) {
+                return true;
+            }
+        }
+
+        return false;
     }
 
     public String getLine() {
@@ -176,15 +177,10 @@ public class DFSFileTableReader implements TableReader {
     }
 
     @Override
-    public void close() {
-        for (RowReader reader : readerList) {
-            try {
-                if (reader != null)
-                    reader.close();
-            } catch (IOException e) {
-                logger.warn("close file failed:", e);
-            }
-        }
+    public void close() {
+        for (RowReader reader : readerList) {
+            IOUtils.closeQuietly(reader);
+        }
     }
 
     private String autoDetectDelim(String line) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a20a9b0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index 27f8c19..c34ff94 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -78,27 +78,27 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             CubeInstance cube = cubeMgr.getCube(cubeName);
             List<TblColRef> columnsNeedDict = cubeMgr.getAllDictColumnsOnFact(cube.getDescriptor());
 
-            int reducerCount = columnsNeedDict.size();
-            int uhcReducerCount = cube.getConfig().getUHCReducerCount();
-
-            int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor());
-            for(int index : uhcIndex) {
-                if(index == 1) {
-                    reducerCount += uhcReducerCount - 1;
-                }
-            }
-
-            if (reducerCount > 255) {
-                throw new IOException("The max reducer number for FactDistinctColumnsJob is 255, please decrease the 'kylin.job.global.dictionary.column.reducer.count' ");
-            }
-
-
+            int reducerCount = columnsNeedDict.size();
+            int uhcReducerCount = cube.getConfig().getUHCReducerCount();
+
+            int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor());
+            for(int index : uhcIndex) {
+                if(index == 1) {
+                    reducerCount += uhcReducerCount - 1;
+                }
+            }
+
+            if (reducerCount > 255) {
+                throw new IllegalArgumentException("The max reducer number for FactDistinctColumnsJob is 255, but now it is " + reducerCount + ", decrease 'kylin.job.uhc.reducer.count'");
+            }
+
+
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, statistics_output);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent);
-
+
             logger.info("Starting: " + job.getJobName());
 
             setJobClasspath(job, cube.getConfig());
@@ -117,7 +117,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
                 System.out.println("Found segment " + segment);
             }
             setupMapper(cube.getSegmentById(segmentID));
-            setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? reducerCount + 2 : reducerCount);
+            setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? reducerCount + 2 : reducerCount);
 
             attachKylinPropsAndMetadata(cube, job.getConfiguration());
 


[4/9] kylin git commit: minor, update default engines in cloned cube

Posted by sh...@apache.org.
minor, update default engines in cloned cube


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5da63c23
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5da63c23
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5da63c23

Branch: refs/heads/KYLIN-2135
Commit: 5da63c233fdee07b11d8e1e7886b50eba0b56f31
Parents: 50059e3
Author: Li Yang <li...@apache.org>
Authored: Thu Nov 3 16:39:27 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Thu Nov 3 16:42:51 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/rest/controller/CubeController.java   | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/5da63c23/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index c70b506..10cd1f2 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -29,6 +29,7 @@ import java.util.UUID;
 
 import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -347,7 +348,11 @@ public class CubeController extends BasicController {
 
         CubeDesc cubeDesc = cube.getDescriptor();
         CubeDesc newCubeDesc = CubeDesc.getCopyOf(cubeDesc);
+        
+        KylinConfig config = cubeService.getConfig();
         newCubeDesc.setName(newCubeName);
+        newCubeDesc.setEngineType(config.getDefaultCubeEngine());
+        newCubeDesc.setStorageType(config.getDefaultStorageEngine());
 
         CubeInstance newCube;
         try {


[6/9] kylin git commit: minor, clearer log on 'Value not exist!' error

Posted by sh...@apache.org.
minor, clearer log on 'Value not exist!' error


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/235b123d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/235b123d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/235b123d

Branch: refs/heads/KYLIN-2135
Commit: 235b123d09c629b685cbaf029f205bbeab887d6f
Parents: 1395955
Author: Li Yang <li...@apache.org>
Authored: Thu Nov 3 18:21:55 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Thu Nov 3 18:23:01 2016 +0800

----------------------------------------------------------------------
 .../main/java/org/apache/kylin/common/util/Dictionary.java   | 2 +-
 .../java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java | 8 +++++++-
 2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/235b123d/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java b/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java
index 86ad5ff..0fb299c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java
@@ -159,7 +159,7 @@ abstract public class Dictionary<T> implements Serializable {
         else {
             int id = getIdFromValueBytesImpl(value, offset, len, roundingFlag);
             if (id < 0)
-                throw new IllegalArgumentException("Value not exists!");
+                throw new IllegalArgumentException("Value '" + Bytes.toString(value, offset, len) + "' (" + Bytes.toStringBinary(value, offset, len) + ") not exists!");
             return id;
         }
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/235b123d/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
index f95cc21..aeeb893 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
@@ -118,7 +118,13 @@ public class CubeCodeSystem implements IGTCodeSystem {
             if (dictEnc.getRoundingFlag() != roundingFlag) {
                 serializer = dictEnc.copy(roundingFlag).asDataTypeSerializer();
             }
-            serializer.serialize(value, buf);
+            try {
+                serializer.serialize(value, buf);
+            } catch (IllegalArgumentException ex) {
+                IllegalArgumentException rewordEx = new IllegalArgumentException("Column " + col + " value '" + value + "' met dictionary error: " + ex.getMessage());
+                rewordEx.setStackTrace(ex.getStackTrace());
+                throw rewordEx;
+            }
         } else {
             if (value instanceof String) {
                 // for dimensions; measures are converted by MeasureIngestor before reaching this point