You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/08/20 10:48:06 UTC

[kylin] branch master updated (55d8ada -> c9d7f5e)

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from 55d8ada  KYLIN-3424 invoke addCubingGarbageCollectionSteps in the cleanup step for HBaseMROutput2Transition
     new 76c9c96  KYLIN-3446 Connect to HBase out of Spark Signed-off-by: shaofengshi <sh...@apache.org>
     new c9337cb  KYLIN-3489 improve the efficiency of enumerating dictionary values by pre-order visiting
     new c9d7f5e  KYLIN-3490 introduce DictionaryEnumerator to answer single encoded column related queries which will not hit cuboid

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 .../org/apache/kylin/common/util/Dictionary.java   |  11 ++
 .../java/org/apache/kylin/cube/model/CubeDesc.java |  81 +++++++-----
 .../java/org/apache/kylin/dict/TrieDictionary.java |  51 ++++++++
 .../org/apache/kylin/dict/TrieDictionaryTest.java  |  30 +++++
 .../apache/kylin/engine/mr/JobBuilderSupport.java  |   4 +
 .../kylin/engine/mr/common/AbstractHadoopJob.java  |   3 +-
 .../kylin/engine/mr/common/BatchConstants.java     |   2 +-
 .../org/apache/kylin/query/ITKylinQueryTest.java   |  12 ++
 .../query01.sql                                    |   9 +-
 .../query02.sql                                    |   3 +-
 .../query03.sql}                                   |   4 +-
 .../query/enumerator/DictionaryEnumerator.java     | 142 +++++++++++++++++++++
 .../apache/kylin/query/enumerator/OLAPQuery.java   |   5 +-
 .../apache/kylin/query/relnode/OLAPTableScan.java  |   3 +
 .../org/apache/kylin/query/schema/OLAPTable.java   |   4 +
 .../kylin/storage/hbase/steps/CreateHTableJob.java |  68 ++++++++--
 .../kylin/storage/hbase/steps/HBaseJobSteps.java   |  28 ++--
 .../kylin/storage/hbase/steps/HBaseSparkSteps.java |   3 +-
 .../kylin/storage/hbase/steps/SparkCubeHFile.java  |  29 ++---
 20 files changed, 411 insertions(+), 85 deletions(-)
 copy kylin-it/src/test/resources/query/{sql_lookup => sql_dict_enumerator}/query01.sql (92%)
 copy kylin-it/src/test/resources/query/{tableau_probing => sql_dict_enumerator}/query02.sql (94%)
 copy kylin-it/src/test/resources/query/{sql_derived/query06.sql => sql_dict_enumerator/query03.sql} (94%)
 create mode 100644 query/src/main/java/org/apache/kylin/query/enumerator/DictionaryEnumerator.java


[kylin] 02/03: KYLIN-3489 improve the efficiency of enumerating dictionary values by pre-order visiting

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit c9337cb4a5e6c4f04c10b7fde1d5e3f6a8ffa25b
Author: Zhong <nj...@apache.org>
AuthorDate: Wed Aug 15 16:04:30 2018 +0800

    KYLIN-3489 improve the efficiency of enumerating dictionary values by pre-order visiting
---
 .../org/apache/kylin/common/util/Dictionary.java   | 11 +++++
 .../java/org/apache/kylin/dict/TrieDictionary.java | 51 ++++++++++++++++++++++
 .../org/apache/kylin/dict/TrieDictionaryTest.java  | 30 +++++++++++++
 3 files changed, 92 insertions(+)

diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java b/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java
index 7c1e8ab..5c47399 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java
@@ -24,11 +24,14 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
+import java.util.List;
 
 import org.apache.kylin.common.KylinConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 /**
  * A bi-way dictionary that maps from dimension/column values to IDs and vice
  * versa. By storing IDs instead of real values, the size of cube is
@@ -170,6 +173,14 @@ abstract public class Dictionary<T> implements Serializable {
 
     abstract public void dump(PrintStream out);
 
+    public List<T> enumeratorValues() {
+        List<T> ret = Lists.newArrayListWithExpectedSize(getSize());
+        for (int i = getMinId(); i <= getMaxId(); i++) {
+            ret.add(getValueFromId(i));
+        }
+        return ret;
+    }
+
     public int nullId() {
         return NULL_ID[getSizeOfId()];
     }
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
index d531c05..8303bb0 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
@@ -28,6 +28,7 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.PrintStream;
 import java.util.Arrays;
+import java.util.List;
 
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
@@ -36,7 +37,9 @@ import org.apache.kylin.common.util.Dictionary;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 /**
  * A dictionary based on Trie data structure that maps enumerations of byte[] to
@@ -359,6 +362,54 @@ public class TrieDictionary<T> extends CacheDictionary<T> {
     }
 
     @Override
+    public List<T> enumeratorValues() {
+        List<T> result = Lists.newArrayListWithExpectedSize(getSize());
+        byte[] buf = new byte[maxValueLength];
+        visitNode(headSize, buf, 0, result);
+        return result;
+    }
+
+    @VisibleForTesting
+    List<T> enumeratorValuesByParent() {
+        return super.enumeratorValues();
+    }
+
+    /**
+     * Visit the trie tree by pre-order
+     * @param n           -- the offset of current node in trieBytes
+     * @param returnValue -- where return value is written to
+     */
+    private void visitNode(int n, byte[] returnValue, int offset, List<T> result) {
+        int o = offset;
+
+        // write current node value
+        int p = n + firstByteOffset;
+        int len = BytesUtil.readUnsigned(trieBytes, p - 1, 1);
+        System.arraycopy(trieBytes, p, returnValue, o, len);
+        o += len;
+
+        // if the value is ended
+        boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE);
+        if (isEndOfValue) {
+            T curNodeValue = bytesConvert.convertFromBytes(returnValue, 0, o);
+            result.add(curNodeValue);
+        }
+
+        // find a child to continue
+        int c = getChildOffset(n);
+        if (c == headSize) // has no children
+            return;
+        while (true) {
+            visitNode(c, returnValue, o, result);
+            if (checkFlag(c, BIT_IS_LAST_CHILD))
+                return;
+            // go to next child
+            p = c + firstByteOffset;
+            c = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1);
+        }
+    }
+
+    @Override
     public void dump(PrintStream out) {
         out.println("Total " + nValues + " values");
         for (int i = 0; i < nValues; i++) {
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryTest.java
index 600b072..c873035 100644
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryTest.java
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryTest.java
@@ -35,13 +35,18 @@ import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Random;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+
 public class TrieDictionaryTest {
 
     public static void main(String[] args) throws Exception {
@@ -209,6 +214,31 @@ public class TrieDictionaryTest {
     }
 
     @Test
+    public void testEnumeratorValues() throws Exception {
+        testEnumeratorValues("src/test/resources/dict/english-words.80 (scowl-2015.05.18).txt");
+        testEnumeratorValues("src/test/resources/dict/dw_category_grouping_names.dat");
+    }
+
+    private void testEnumeratorValues(String file) throws Exception {
+        InputStream is = new FileInputStream(file);
+        ArrayList<String> str = loadStrings(is);
+        TrieDictionaryBuilder<String> b = newDictBuilder(str);
+        TrieDictionary<String> dict = b.build(0);
+        System.out.println("Dictionary size for file " + file + " is " + dict.getSize());
+
+        Stopwatch sw = new Stopwatch();
+        sw.start();
+        List<String> values1 = dict.enumeratorValuesByParent();
+        System.out.println("By iterating id visit the time cost " + sw.elapsed(TimeUnit.MILLISECONDS) + " ms");
+        sw.reset();
+        sw.start();
+        List<String> values2 = dict.enumeratorValues();
+        System.out.println("By pre-order visit the time cost " + sw.elapsed(TimeUnit.MILLISECONDS) + " ms");
+        sw.stop();
+        assertEquals(Sets.newHashSet(values1), Sets.newHashSet(values2));
+    }
+
+    @Test
     public void englishWordsTest() throws Exception {
         InputStream is = new FileInputStream("src/test/resources/dict/english-words.80 (scowl-2015.05.18).txt");
         ArrayList<String> str = loadStrings(is);


[kylin] 01/03: KYLIN-3446 Connect to HBase out of Spark Signed-off-by: shaofengshi

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 76c9c960be542c919301c72b34c7ae5ce6f1ec1c
Author: Yichen Zhou <zh...@gmail.com>
AuthorDate: Wed Aug 8 09:53:29 2018 +0800

    KYLIN-3446 Connect to HBase out of Spark
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../apache/kylin/engine/mr/JobBuilderSupport.java  |  4 ++
 .../kylin/engine/mr/common/AbstractHadoopJob.java  |  3 +-
 .../kylin/engine/mr/common/BatchConstants.java     |  2 +-
 .../kylin/storage/hbase/steps/CreateHTableJob.java | 68 ++++++++++++++++++----
 .../kylin/storage/hbase/steps/HBaseJobSteps.java   | 28 ++++++---
 .../kylin/storage/hbase/steps/HBaseSparkSteps.java |  3 +-
 .../kylin/storage/hbase/steps/SparkCubeHFile.java  | 29 +++++----
 7 files changed, 97 insertions(+), 40 deletions(-)

diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 649b4c3..c6abf16 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -311,6 +311,10 @@ public class JobBuilderSupport {
         return getOptimizationRootPath(jobId) + "/cuboid/";
     }
 
+    public String getHBaseConfFilePath(String jobId) {
+       return getJobWorkingDir(jobId) + "/hbase-conf.xml";
+    }
+
     // ============================================================================
     // static methods also shared by other job flow participant
     // ----------------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 329dd56..2976080 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -129,7 +129,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
             .create(BatchConstants.ARG_LOOKUP_SNAPSHOT_ID);
     protected static final Option OPTION_META_URL = OptionBuilder.withArgName(BatchConstants.ARG_META_URL)
             .hasArg().isRequired(true).withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
-
+    public static final Option OPTION_HBASE_CONF_PATH = OptionBuilder.withArgName(BatchConstants.ARG_HBASE_CONF_PATH).hasArg()
+            .isRequired(true).withDescription("HBase config file path").create(BatchConstants.ARG_HBASE_CONF_PATH);
 
     private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index a4a52ad..6fe55e2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -104,7 +104,7 @@ public interface BatchConstants {
     String ARG_LOOKUP_SNAPSHOT_ID = "snapshotID";
     String ARG_EXT_LOOKUP_SNAPSHOTS_INFO = "extlookupsnapshots";
     String ARG_META_URL = "metadataUrl";
-
+    String ARG_HBASE_CONF_PATH = "hbaseConfPath";
     /**
      * logger and counter
      */
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 68aa172..5e17a4c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -26,12 +26,17 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.cli.Options;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
@@ -46,6 +51,7 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.engine.mr.common.CuboidShardUtil;
+import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,6 +69,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
     CubeDesc cubeDesc = null;
     String segmentID = null;
     String cuboidModeName = null;
+    String hbaseConfPath = null;
     KylinConfig kylinConfig;
     Path partitionFilePath;
 
@@ -74,6 +81,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         options.addOption(OPTION_SEGMENT_ID);
         options.addOption(OPTION_PARTITION_FILE_PATH);
         options.addOption(OPTION_CUBOID_MODE);
+        options.addOption(OPTION_HBASE_CONF_PATH);
         parseOptions(options, args);
 
         partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
@@ -85,11 +93,12 @@ public class CreateHTableJob extends AbstractHadoopJob {
         kylinConfig = cube.getConfig();
         segmentID = getOptionValue(OPTION_SEGMENT_ID);
         cuboidModeName = getOptionValue(OPTION_CUBOID_MODE);
+        hbaseConfPath = getOptionValue(OPTION_HBASE_CONF_PATH);
         CubeSegment cubeSegment = cube.getSegmentById(segmentID);
 
         byte[][] splitKeys;
         Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, kylinConfig).getCuboidSizeMap();
-        
+
         // for cube planner, will keep cuboidSizeMap unchanged if cube planner is disabled
         Set<Long> buildingCuboids = cube.getCuboidsByMode(cuboidModeName);
         if (buildingCuboids != null && !buildingCuboids.isEmpty()) {
@@ -104,14 +113,35 @@ public class CreateHTableJob extends AbstractHadoopJob {
             }
             cuboidSizeMap = optimizedCuboidSizeMap;
         }
-        
+
         splitKeys = getRegionSplitsFromCuboidStatistics(cuboidSizeMap, kylinConfig, cubeSegment,
                 partitionFilePath.getParent());
 
         CubeHTableUtil.createHTable(cubeSegment, splitKeys);
+        exportHBaseConfiguration(cubeSegment.getStorageLocationIdentifier());
         return 0;
     }
 
+    private void exportHBaseConfiguration(String hbaseTableName) throws Exception {
+
+        Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
+        HadoopUtil.healSickConfig(hbaseConf);
+        Job job = Job.getInstance(hbaseConf, hbaseTableName);
+        HTable table = new HTable(hbaseConf, hbaseTableName);
+        HFileOutputFormat2.configureIncrementalLoadMap(job, table);
+
+        logger.info("Saving HBase configuration to " + hbaseConfPath);
+        FileSystem fs = HadoopUtil.getWorkingFileSystem();
+        FSDataOutputStream out = null;
+        try {
+            out = fs.create(new Path(hbaseConfPath));
+            job.getConfiguration().writeXml(out);
+        } catch (IOException e) {
+            throw new ExecuteException("Write hbase configuration failed", e);
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
 
     //one region for one shard
     private static byte[][] getSplitsByRegionCount(int regionCount) {
@@ -124,7 +154,9 @@ public class CreateHTableJob extends AbstractHadoopJob {
         return result;
     }
 
-    public static byte[][] getRegionSplitsFromCuboidStatistics(final Map<Long, Double> cubeSizeMap, final KylinConfig kylinConfig, final CubeSegment cubeSegment, final Path hfileSplitsOutputFolder) throws IOException {
+    public static byte[][] getRegionSplitsFromCuboidStatistics(final Map<Long, Double> cubeSizeMap,
+            final KylinConfig kylinConfig, final CubeSegment cubeSegment, final Path hfileSplitsOutputFolder)
+            throws IOException {
 
         final CubeDesc cubeDesc = cubeSegment.getCubeDesc();
         float cut = cubeDesc.getConfig().getKylinHBaseRegionCut();
@@ -157,7 +189,8 @@ public class CreateHTableJob extends AbstractHadoopJob {
             }
 
             if (nRegion != original) {
-                logger.info("Region count is adjusted from " + original + " to " + nRegion + " to help random sharding");
+                logger.info(
+                        "Region count is adjusted from " + original + " to " + nRegion + " to help random sharding");
             }
         }
 
@@ -188,10 +221,13 @@ public class CreateHTableJob extends AbstractHadoopJob {
                 }
 
                 if (shardNum > nRegion) {
-                    logger.info(String.format("Cuboid %d 's estimated size %.2f MB will generate %d regions, reduce to %d", cuboidId, estimatedSize, shardNum, nRegion));
+                    logger.info(
+                            String.format("Cuboid %d 's estimated size %.2f MB will generate %d regions, reduce to %d",
+                                    cuboidId, estimatedSize, shardNum, nRegion));
                     shardNum = nRegion;
                 } else {
-                    logger.info(String.format("Cuboid %d 's estimated size %.2f MB will generate %d regions", cuboidId, estimatedSize, shardNum));
+                    logger.info(String.format("Cuboid %d 's estimated size %.2f MB will generate %d regions", cuboidId,
+                            estimatedSize, shardNum));
                 }
 
                 cuboidShards.put(cuboidId, (short) shardNum);
@@ -204,7 +240,8 @@ public class CreateHTableJob extends AbstractHadoopJob {
             }
 
             for (int i = 0; i < nRegion; ++i) {
-                logger.info(String.format("Region %d's estimated size is %.2f MB, accounting for %.2f percent", i, regionSizes[i], 100.0 * regionSizes[i] / totalSizeInM));
+                logger.info(String.format("Region %d's estimated size is %.2f MB, accounting for %.2f percent", i,
+                        regionSizes[i], 100.0 * regionSizes[i] / totalSizeInM));
             }
 
             CuboidShardUtil.saveCuboidShards(cubeSegment, cuboidShards, nRegion);
@@ -222,7 +259,8 @@ public class CreateHTableJob extends AbstractHadoopJob {
                 if (size >= mbPerRegion || (size + cubeSizeMap.get(cuboidId)) >= mbPerRegion * 1.2) {
                     // if the size already bigger than threshold, or it will exceed by 20%, cut for next region
                     regionSplit.add(cuboidId);
-                    logger.info("Region " + regionIndex + " will be " + size + " MB, contains cuboids < " + cuboidId + " (" + cuboidCount + ") cuboids");
+                    logger.info("Region " + regionIndex + " will be " + size + " MB, contains cuboids < " + cuboidId
+                            + " (" + cuboidCount + ") cuboids");
                     size = 0;
                     cuboidCount = 0;
                     regionIndex++;
@@ -240,7 +278,8 @@ public class CreateHTableJob extends AbstractHadoopJob {
         }
     }
 
-    protected static void saveHFileSplits(final List<HashMap<Long, Double>> innerRegionSplits, int mbPerRegion, final Path outputFolder, final KylinConfig kylinConfig) throws IOException {
+    protected static void saveHFileSplits(final List<HashMap<Long, Double>> innerRegionSplits, int mbPerRegion,
+            final Path outputFolder, final KylinConfig kylinConfig) throws IOException {
 
         if (outputFolder == null) {
             logger.warn("outputFolder for hfile split file is null, skip inner region split");
@@ -300,7 +339,8 @@ public class CreateHTableJob extends AbstractHadoopJob {
                     logger.info(String.format("Region %d's hfile %d size is %.2f mb", i, j, accumulatedSize));
                     byte[] split = new byte[RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN];
                     BytesUtil.writeUnsigned(i, split, 0, RowConstants.ROWKEY_SHARDID_LEN);
-                    System.arraycopy(Bytes.toBytes(cuboid), 0, split, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
+                    System.arraycopy(Bytes.toBytes(cuboid), 0, split, RowConstants.ROWKEY_SHARDID_LEN,
+                            RowConstants.ROWKEY_CUBOIDID_LEN);
                     splits.add(split);
                     accumulatedSize = 0;
                     j++;
@@ -310,11 +350,15 @@ public class CreateHTableJob extends AbstractHadoopJob {
 
         }
 
-        SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter(hbaseConf, SequenceFile.Writer.file(hfilePartitionFile), SequenceFile.Writer.keyClass(RowKeyWritable.class), SequenceFile.Writer.valueClass(NullWritable.class));
+        SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter(hbaseConf,
+                SequenceFile.Writer.file(hfilePartitionFile), SequenceFile.Writer.keyClass(RowKeyWritable.class),
+                SequenceFile.Writer.valueClass(NullWritable.class));
 
         for (int i = 0; i < splits.size(); i++) {
             //when we compare the rowkey, we compare the row firstly.
-            hfilePartitionWriter.append(new RowKeyWritable(KeyValue.createFirstOnRow(splits.get(i)).createKeyOnly(false).getKey()), NullWritable.get());
+            hfilePartitionWriter.append(
+                    new RowKeyWritable(KeyValue.createFirstOnRow(splits.get(i)).createKeyOnly(false).getKey()),
+                    NullWritable.get());
         }
         hfilePartitionWriter.close();
     }
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
index 4fda139..e48090d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
@@ -59,8 +59,10 @@ public abstract class HBaseJobSteps extends JobBuilderSupport {
         StringBuilder cmd = new StringBuilder();
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
         appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION,
+                getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_HBASE_CONF_PATH, getHBaseConfFilePath(jobId));
 
         createHtableStep.setJobParams(cmd.toString());
         createHtableStep.setJobClass(CreateHTableJob.class);
@@ -69,7 +71,8 @@ public abstract class HBaseJobSteps extends JobBuilderSupport {
     }
 
     // TODO make it abstract
-    public MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegment> mergingSegments, String jobID, Class<? extends AbstractHadoopJob> clazz) {
+    public MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegment> mergingSegments,
+            String jobID, Class<? extends AbstractHadoopJob> clazz) {
         final List<String> mergingCuboidPaths = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
             mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
@@ -86,7 +89,8 @@ public abstract class HBaseJobSteps extends JobBuilderSupport {
         appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
         appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, formattedPath);
         appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
-        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+                "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
 
         mergeCuboidDataStep.setMapReduceParams(cmd.toString());
         mergeCuboidDataStep.setMapReduceJobClass(clazz);
@@ -148,8 +152,10 @@ public abstract class HBaseJobSteps extends JobBuilderSupport {
     }
 
     public List<String> getMergingHTables() {
-        final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg);
-        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge, target segment " + seg);
+        final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization())
+                .getMergingSegments((CubeSegment) seg);
+        Preconditions.checkState(mergingSegments.size() > 1,
+                "there should be more than 2 segments to merge, target segment " + seg);
         final List<String> mergingHTables = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
             mergingHTables.add(merging.getStorageLocationIdentifier());
@@ -158,8 +164,10 @@ public abstract class HBaseJobSteps extends JobBuilderSupport {
     }
 
     public List<String> getMergingHDFSPaths() {
-        final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg);
-        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge, target segment " + seg);
+        final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization())
+                .getMergingSegments((CubeSegment) seg);
+        Preconditions.checkState(mergingSegments.size() > 1,
+                "there should be more than 2 segments to merge, target segment " + seg);
         final List<String> mergingHDFSPaths = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
             mergingHDFSPaths.add(getJobWorkingDir(merging.getLastBuildJobID()));
@@ -180,11 +188,13 @@ public abstract class HBaseJobSteps extends JobBuilderSupport {
     }
 
     public String getHFilePath(String jobId) {
-        return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/hfile/");
+        return HBaseConnection.makeQualifiedPathInHBaseCluster(
+                getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/hfile/");
     }
 
     public String getRowkeyDistributionOutputPath(String jobId) {
-        return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/rowkey_stats");
+        return HBaseConnection.makeQualifiedPathInHBaseCluster(
+                getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/rowkey_stats");
     }
 
     public void addOptimizeGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
index 622a0e8..be230f0 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
@@ -22,6 +22,7 @@ import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.spark.SparkBatchCubingJobBuilder2;
 import org.apache.kylin.engine.spark.SparkExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -48,7 +49,7 @@ public class HBaseSparkSteps extends HBaseJobSteps {
         sparkExecutable.setParam(SparkCubeHFile.OPTION_OUTPUT_PATH.getOpt(), getHFilePath(jobId));
         sparkExecutable.setParam(SparkCubeHFile.OPTION_PARTITION_FILE_PATH.getOpt(),
                 getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
-
+        sparkExecutable.setParam(AbstractHadoopJob.OPTION_HBASE_CONF_PATH.getOpt(), getHBaseConfFilePath(jobId));
         sparkExecutable.setJobId(jobId);
 
         StringBuilder jars = new StringBuilder();
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index fd8459f..c87a739 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -17,7 +17,6 @@
 */
 package org.apache.kylin.storage.hbase.steps;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -30,12 +29,11 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -58,7 +56,6 @@ import org.apache.kylin.engine.mr.common.SerializableConfiguration;
 import org.apache.kylin.engine.spark.KylinSparkJobListener;
 import org.apache.kylin.engine.spark.SparkUtil;
 import org.apache.kylin.measure.MeasureCodec;
-import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.spark.Partitioner;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -102,6 +99,7 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
         options.addOption(OPTION_META_URL);
         options.addOption(OPTION_OUTPUT_PATH);
         options.addOption(OPTION_PARTITION_FILE_PATH);
+        options.addOption(AbstractHadoopJob.OPTION_HBASE_CONF_PATH);
     }
 
     @Override
@@ -117,11 +115,12 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
         final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
         final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
         final Path partitionFilePath = new Path(optionsHelper.getOptionValue(OPTION_PARTITION_FILE_PATH));
+        final String hbaseConfFile = optionsHelper.getOptionValue(AbstractHadoopJob.OPTION_HBASE_CONF_PATH);
 
         Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"), KeyValueCreator.class,
                 KeyValue.class, RowKeyWritable.class };
 
-        SparkConf conf = new SparkConf().setAppName("Covnerting Hfile for:" + cubeName + " segment " + segmentId);
+        SparkConf conf = new SparkConf().setAppName("Converting HFile for:" + cubeName + " segment " + segmentId);
         //serialization conf
         conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
         conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
@@ -171,17 +170,15 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
         }
 
         logger.info("There are " + keys.size() + " split keys, totally " + (keys.size() + 1) + " hfiles");
-        Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
-        HadoopUtil.healSickConfig(hbaseConf);
-        Job job = new Job(hbaseConf, cubeSegment.getStorageLocationIdentifier());
-        job.getConfiguration().set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3
-        HTable table = new HTable(hbaseConf, cubeSegment.getStorageLocationIdentifier());
-        try {
-            HFileOutputFormat2.configureIncrementalLoadMap(job, table);
-        } catch (IOException ioe) {
-            // this can be ignored.
-            logger.debug(ioe.getMessage(), ioe);
-        }
+
+        //HBase conf
+        logger.info("Loading HBase configuration from:" + hbaseConfFile);
+        FSDataInputStream confInput = fs.open(new Path(hbaseConfFile));
+
+        Configuration hbaseJobConf = new Configuration();
+        hbaseJobConf.addResource(confInput);
+        hbaseJobConf.set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3
+        Job job = new Job(hbaseJobConf, cubeSegment.getStorageLocationIdentifier());
 
         FileOutputFormat.setOutputPath(job, new Path(outputPath));
 


[kylin] 03/03: KYLIN-3490 introduce DictionaryEnumerator to answer single encoded column related queries which will not hit cuboid

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit c9d7f5ec2ff8834f0f1b24610485bb819741206f
Author: Zhong <nj...@apache.org>
AuthorDate: Wed Aug 15 16:33:19 2018 +0800

    KYLIN-3490 introduce DictionaryEnumerator to answer single encoded column related queries which will not hit cuboid
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 .../java/org/apache/kylin/cube/model/CubeDesc.java |  81 +++++++-----
 .../org/apache/kylin/query/ITKylinQueryTest.java   |  12 ++
 .../query/sql_dict_enumerator/query01.sql          |  21 +++
 .../query/sql_dict_enumerator/query02.sql          |  20 +++
 .../query/sql_dict_enumerator/query03.sql          |  20 +++
 .../query/enumerator/DictionaryEnumerator.java     | 142 +++++++++++++++++++++
 .../apache/kylin/query/enumerator/OLAPQuery.java   |   5 +-
 .../apache/kylin/query/relnode/OLAPTableScan.java  |   3 +
 .../org/apache/kylin/query/schema/OLAPTable.java   |   4 +
 10 files changed, 276 insertions(+), 36 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index dbf22b5..f154eee 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1227,6 +1227,10 @@ abstract public class KylinConfigBase implements Serializable {
     // QUERY
     // ============================================================================
 
+    public boolean isDictionaryEnumeratorEnabled() {
+        return Boolean.valueOf(getOptional("kylin.query.enable-dict-enumerator", "false"));
+    }
+
     public Boolean isEnumerableRulesEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.query.calcite.enumerable-rules-enabled", "false"));
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 5b4a134..95c8b40 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -18,16 +18,28 @@
 
 package org.apache.kylin.cube.model;
 
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.lang.reflect.Method;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.ArrayUtils;
@@ -65,27 +77,16 @@ import org.apache.kylin.metadata.realization.RealizationType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Method;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeSet;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 /**
  */
@@ -1298,18 +1299,28 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
     }
 
     /**
-     * Get columns that have dictionary
+     * Get dimensions that have dictionary
      */
-    public Set<TblColRef> getAllColumnsHaveDictionary() {
-        Set<TblColRef> result = Sets.newLinkedHashSet();
+    public Set<TblColRef> getAllDimsHaveDictionary() {
+        Set<TblColRef> result = Sets.newHashSet();
 
-        // dictionaries in dimensions
         for (RowKeyColDesc rowKeyColDesc : rowkey.getRowKeyColumns()) {
             TblColRef colRef = rowKeyColDesc.getColRef();
             if (rowkey.isUseDictionary(colRef)) {
                 result.add(colRef);
             }
         }
+        return result;
+    }
+
+    /**
+     * Get columns that have dictionary
+     */
+    public Set<TblColRef> getAllColumnsHaveDictionary() {
+        Set<TblColRef> result = Sets.newLinkedHashSet();
+
+        // dictionaries in dimensions
+        result.addAll(getAllDimsHaveDictionary());
 
         // dictionaries in measures
         for (MeasureDesc measure : measures) {
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index e6afbe0..e01334f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -428,6 +428,18 @@ public class ITKylinQueryTest extends KylinTestBase {
     }
 
     @Test
+    public void testDictionaryEnumerator() throws Exception {
+        boolean ifDictEnumeratorEnabled = config.isDictionaryEnumeratorEnabled();
+        if (!ifDictEnumeratorEnabled) {
+            config.setProperty("kylin.query.enable-dict-enumerator", "true");
+        }
+        batchExecuteQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_dict_enumerator");
+        if (!ifDictEnumeratorEnabled) {
+            config.setProperty("kylin.query.enable-dict-enumerator", "false");
+        }
+    }
+
+    @Test
     public void testValues() throws Exception {
         execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_values", null, true);
     }
diff --git a/kylin-it/src/test/resources/query/sql_dict_enumerator/query01.sql b/kylin-it/src/test/resources/query/sql_dict_enumerator/query01.sql
new file mode 100644
index 0000000..963e3b3
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_dict_enumerator/query01.sql
@@ -0,0 +1,21 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select SLR_SEGMENT_CD
+from TEST_KYLIN_FACT
+group by SLR_SEGMENT_CD
\ No newline at end of file
diff --git a/kylin-it/src/test/resources/query/sql_dict_enumerator/query02.sql b/kylin-it/src/test/resources/query/sql_dict_enumerator/query02.sql
new file mode 100644
index 0000000..2617e91
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_dict_enumerator/query02.sql
@@ -0,0 +1,20 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select DISTINCT SLR_SEGMENT_CD
+from TEST_KYLIN_FACT
\ No newline at end of file
diff --git a/kylin-it/src/test/resources/query/sql_dict_enumerator/query03.sql b/kylin-it/src/test/resources/query/sql_dict_enumerator/query03.sql
new file mode 100644
index 0000000..cb07666
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_dict_enumerator/query03.sql
@@ -0,0 +1,20 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select MAX(SLR_SEGMENT_CD)
+from TEST_KYLIN_FACT
\ No newline at end of file
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/DictionaryEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/DictionaryEnumerator.java
new file mode 100644
index 0000000..6af65ee
--- /dev/null
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/DictionaryEnumerator.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.enumerator;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.tuple.Tuple;
+import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class DictionaryEnumerator implements Enumerator<Object[]> {
+
+    private final static Logger logger = LoggerFactory.getLogger(DictionaryEnumerator.class);
+
+    private List<Dictionary<String>> dictList;
+    private final Object[] current;
+    private final TblColRef dictCol;
+    private final int dictColIdx;
+    private Iterator<String> currentDict;
+    private Iterator<Dictionary<String>> iterator;
+
+    public DictionaryEnumerator(OLAPContext olapContext) {
+        Preconditions.checkArgument(olapContext.allColumns.size() == 1, "The query should only relate to one column");
+
+        dictCol = olapContext.allColumns.iterator().next();
+        Preconditions.checkArgument(ifColumnHaveDictionary(dictCol, olapContext.realization, false),
+                "The column " + dictCol + " should be encoded as dictionary for " + olapContext.realization);
+
+        dictList = getAllDictionaries(dictCol, olapContext.realization);
+        current = new Object[olapContext.returnTupleInfo.size()];
+        dictColIdx = olapContext.returnTupleInfo.getColumnIndex(dictCol);
+
+        reset();
+        logger.info("Will use DictionaryEnumerator to answer query which is only related to column " + dictCol);
+    }
+
+    public static boolean ifDictionaryEnumeratorEligible(OLAPContext olapContext) {
+        if (olapContext.allColumns.size() != 1) {
+            return false;
+        }
+
+        TblColRef dictCol = olapContext.allColumns.iterator().next();
+        if (!ifColumnHaveDictionary(dictCol, olapContext.realization, true)) {
+            return false;
+        }
+        return true;
+    }
+
+    private static boolean ifColumnHaveDictionary(TblColRef col, IRealization realization, boolean enableCheck) {
+        if (realization instanceof CubeInstance) {
+            final CubeInstance cube = (CubeInstance) realization;
+            boolean ifEnabled = !enableCheck || cube.getConfig().isDictionaryEnumeratorEnabled();
+            return ifEnabled && cube.getDescriptor().getAllDimsHaveDictionary().contains(col);
+        } else if (realization instanceof HybridInstance) {
+            final HybridInstance hybridInstance = (HybridInstance) realization;
+            for (IRealization entry : hybridInstance.getRealizations()) {
+                if (!ifColumnHaveDictionary(col, entry, enableCheck)) {
+                    return false;
+                }
+            }
+            return true;
+        }
+        return false;
+    }
+
+    public static List<Dictionary<String>> getAllDictionaries(TblColRef col, IRealization realization) {
+        Set<Dictionary<String>> result = Sets.newHashSet();
+        if (realization instanceof CubeInstance) {
+            final CubeInstance cube = (CubeInstance) realization;
+            for (CubeSegment segment : cube.getSegments(SegmentStatusEnum.READY)) {
+                result.add(segment.getDictionary(col));
+            }
+        } else if (realization instanceof HybridInstance) {
+            final HybridInstance hybridInstance = (HybridInstance) realization;
+            for (IRealization entry : hybridInstance.getRealizations()) {
+                result.addAll(getAllDictionaries(col, entry));
+            }
+        } else {
+            throw new IllegalStateException("All leaf realizations should be CubeInstance");
+        }
+        return Lists.newArrayList(result);
+    }
+
+    @Override
+    public boolean moveNext() {
+        while (currentDict == null || !currentDict.hasNext()) {
+            if (!iterator.hasNext()) {
+                return false;
+            }
+            final Dictionary<String> dict = iterator.next();
+            currentDict = dict.enumeratorValues().iterator();
+        }
+
+        current[dictColIdx] = Tuple.convertOptiqCellValue(currentDict.next(), dictCol.getDatatype());
+        return true;
+    }
+
+    @Override
+    public Object[] current() {
+        return current;
+    }
+
+    @Override
+    public void reset() {
+        iterator = dictList.iterator();
+    }
+
+    @Override
+    public void close() {
+    }
+}
\ No newline at end of file
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
index 84ac5cf..c094ff5 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
@@ -37,7 +37,8 @@ public class OLAPQuery extends AbstractEnumerable<Object[]> implements Enumerabl
     public enum EnumeratorTypeEnum {
         OLAP, //finish query with Cube or II, or a combination of both
         LOOKUP_TABLE, //using a snapshot of lookup table
-        HIVE //using hive
+        HIVE, //using hive
+        COL_DICT // using a column's dictionary
     }
 
     private final DataContext optiqContext;
@@ -65,6 +66,8 @@ public class OLAPQuery extends AbstractEnumerable<Object[]> implements Enumerabl
                     : new OLAPEnumerator(olapContext, optiqContext);
         case LOOKUP_TABLE:
             return BackdoorToggles.getPrepareOnly() ? new EmptyEnumerator() : new LookupTableEnumerator(olapContext);
+        case COL_DICT:
+            return BackdoorToggles.getPrepareOnly() ? new EmptyEnumerator() : new DictionaryEnumerator(olapContext);
         case HIVE:
             return BackdoorToggles.getPrepareOnly() ? new EmptyEnumerator() : new HiveEnumerator(olapContext);
         default:
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java
index c23f1c5..ac6241f 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java
@@ -72,6 +72,7 @@ import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.query.enumerator.DictionaryEnumerator;
 import org.apache.kylin.query.optrule.AggregateMultipleExpandRule;
 import org.apache.kylin.query.optrule.AggregateProjectReduceRule;
 import org.apache.kylin.query.optrule.OLAPAggregateRule;
@@ -419,6 +420,8 @@ public class OLAPTableScan extends TableScan implements OLAPRel, EnumerableRel {
         // if the table to scan is not the fact table of cube, then it's a lookup table
         if (context.realization.getModel().isLookupTable(tableName)) {
             return "executeLookupTableQuery";
+        } else if (DictionaryEnumerator.ifDictionaryEnumeratorEligible(context)) {
+            return "executeColumnDictionaryQuery";
         } else {
             return "executeOLAPQuery";
         }
diff --git a/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java b/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
index 216c6d4..60a856d 100644
--- a/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
+++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
@@ -280,6 +280,10 @@ public class OLAPTable extends AbstractQueryableTable implements TranslatableTab
         return new OLAPQuery(optiqContext, EnumeratorTypeEnum.LOOKUP_TABLE, ctxSeq);
     }
 
+    public Enumerable<Object[]> executeColumnDictionaryQuery(DataContext optiqContext, int ctxSeq) {
+        return new OLAPQuery(optiqContext, EnumeratorTypeEnum.COL_DICT, ctxSeq);
+    }
+
     public Enumerable<Object[]> executeHiveQuery(DataContext optiqContext, int ctxSeq) {
         return new OLAPQuery(optiqContext, EnumeratorTypeEnum.HIVE, ctxSeq);
     }