You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/01/04 06:05:04 UTC

[kylin] 01/06: KYLIN-4818 Calculate cuboid rowcount via HLL

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit a6f78cd53b951acf3ec966fa537dae3f2b82fd1e
Author: XiaoxiangYu <xx...@apache.org>
AuthorDate: Mon Nov 23 21:44:23 2020 +0800

    KYLIN-4818 Calculate cuboid rowcount via HLL
---
 .../kylin/common/annotation/Clarification.java     |   6 +-
 .../java/org/apache/kylin/cube/CubeSegment.java    |  19 ++-
 .../engine/spark/application/SparkApplication.java |   2 +-
 .../engine/spark/builder/NBuildSourceInfo.java     |  16 +--
 .../kylin/engine/spark/job/CubeBuildJob.java       |  27 ++++-
 .../kylin/engine/spark/job/CuboidAggregator.scala  |   1 +
 .../kylin/engine/spark/job/CuboidStatistics.scala  | 129 +++++++++++++++++++++
 .../engine/spark/job/ParentSourceChooser.scala     |  30 ++++-
 .../spark/job/ResourceDetectBeforeCubingJob.java   |   1 +
 .../kylin/engine/spark/metadata/MetaData.scala     |  21 +++-
 .../engine/spark/metadata/MetadataConverter.scala  |  12 +-
 11 files changed, 234 insertions(+), 30 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/annotation/Clarification.java b/core-common/src/main/java/org/apache/kylin/common/annotation/Clarification.java
index b039e50..4b0ea2f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/annotation/Clarification.java
+++ b/core-common/src/main/java/org/apache/kylin/common/annotation/Clarification.java
@@ -30,9 +30,11 @@ import java.lang.annotation.Target;
         ElementType.CONSTRUCTOR, ElementType.LOCAL_VARIABLE, ElementType.PACKAGE})
 public @interface Clarification {
 
-    Priority priority();
+    Priority priority() default Priority.MINOR;
 
-    String msg() default "N/A";
+    String msg() default "null";
+
+    boolean deprecated() default false; // Please remove deprecated key when Kylin4 GA
 
     enum Priority {
         MINOR,
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index d0224fb..715e684 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -31,6 +31,7 @@ import java.util.TimeZone;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.annotation.Clarification;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
@@ -75,8 +76,10 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
     @JsonProperty("date_range_end")
     private long dateRangeEnd;
     @JsonProperty("source_offset_start")
+    @Clarification(deprecated = true)
     private long sourceOffsetStart;
     @JsonProperty("source_offset_end")
+    @Clarification(deprecated = true)
     private long sourceOffsetEnd;
     @JsonProperty("status")
     private SegmentStatusEnum status;
@@ -107,6 +110,7 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
     private String binarySignature; // a hash of cube schema and dictionary ID, used for sanity check
 
     @JsonProperty("dictionaries")
+    @Clarification(deprecated = true)
     private ConcurrentHashMap<String, String> dictionaries; // table/column ==> dictionary resource path
     @JsonProperty("snapshots")
     private ConcurrentHashMap<String, String> snapshots; // table name ==> snapshot resource path
@@ -116,13 +120,16 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
 
     @JsonProperty("source_partition_offset_start")
     @JsonInclude(JsonInclude.Include.NON_EMPTY)
+    @Clarification(deprecated = true)
     private Map<Integer, Long> sourcePartitionOffsetStart = Maps.newHashMap();
 
     @JsonProperty("source_partition_offset_end")
     @JsonInclude(JsonInclude.Include.NON_EMPTY)
+    @Clarification(deprecated = true)
     private Map<Integer, Long> sourcePartitionOffsetEnd = Maps.newHashMap();
 
     @JsonProperty("stream_source_checkpoint")
+    @Clarification(deprecated = true)
     private String streamSourceCheckpoint;
 
     @JsonProperty("additionalInfo")
@@ -530,11 +537,19 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
     }
 
     public String getStatisticsResourcePath() {
-        return getStatisticsResourcePath(this.getCubeInstance().getName(), this.getUuid());
+        return getStatisticsResourcePath(this.getCubeInstance().getName(), this.getUuid(), ".seq");
+    }
+
+    public String getPreciseStatisticsResourcePath() {
+        return getStatisticsResourcePath(this.getCubeInstance().getName(), this.getUuid(), ".json");
     }
 
     public static String getStatisticsResourcePath(String cubeName, String cubeSegmentId) {
-        return ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeName + "/" + cubeSegmentId + ".seq";
+        return getStatisticsResourcePath(cubeName, cubeSegmentId, ".seq");
+    }
+
+    public static String getStatisticsResourcePath(String cubeName, String cubeSegmentId, String suffix) {
+        return ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeName + "/" + cubeSegmentId + suffix;
     }
 
     @Override
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 98dde2b..7867474 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -219,7 +219,7 @@ public abstract class SparkApplication {
         jobId = getParam(MetadataConstants.P_JOB_ID);
         project = getParam(MetadataConstants.P_PROJECT_NAME);
         if (getParam(MetadataConstants.P_CUBOID_NUMBER) != null) {
-            layoutSize = Integer.valueOf(getParam(MetadataConstants.P_CUBOID_NUMBER));
+            layoutSize = Integer.parseInt(getParam(MetadataConstants.P_CUBOID_NUMBER));
         }
         try (KylinConfig.SetAndUnsetThreadLocalConfig autoCloseConfig = KylinConfig
                 .setAndUnsetThreadLocalConfig(MetaDumpUtil.loadKylinConfigFromHdfs(hdfsMetalUrl))) {
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/builder/NBuildSourceInfo.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/builder/NBuildSourceInfo.java
index 4ae87ef..51d7414 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/builder/NBuildSourceInfo.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/builder/NBuildSourceInfo.java
@@ -34,7 +34,7 @@ import org.apache.kylin.shaded.com.google.common.base.Preconditions;
 public class NBuildSourceInfo {
     protected static final Logger logger = LoggerFactory.getLogger(NBuildSourceInfo.class);
 
-    private Dataset<Row> flattableDS;
+    private Dataset<Row> flatTableDS;
     private String viewFactTablePath;
     private SparkSession ss;
     private long byteSize;
@@ -52,12 +52,12 @@ public class NBuildSourceInfo {
         this.byteSize = byteSize;
     }
 
-    public void setFlattableDS(Dataset<Row> flattableDS) {
-        this.flattableDS = flattableDS;
+    public void setFlatTableDS(Dataset<Row> flatTableDS) {
+        this.flatTableDS = flatTableDS;
     }
 
-    public Dataset<Row> getFlattableDS() {
-        return flattableDS;
+    public Dataset<Row> getFlatTableDS() {
+        return flatTableDS;
     }
 
     public Dataset<Row> getParentDS() {
@@ -66,9 +66,9 @@ public class NBuildSourceInfo {
             Preconditions.checkNotNull(ss, "SparkSession is null is NBuildSourceInfo.");
             return ss.read().parquet(parentStoragePath);
         } else {
-            Preconditions.checkState(flattableDS != null, "Path and DS can no be empty at the same time.");
-            logger.info("parent storage path not exists, use flattable dataset.");
-            return flattableDS;
+            Preconditions.checkState(flatTableDS != null, "Path and DS can no be empty at the same time.");
+            logger.info("parent storage path not exists, use flatTable dataset.");
+            return flatTableDS;
         }
     }
 
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index 38a101b..1028911 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -59,6 +59,8 @@ import org.apache.kylin.engine.spark.utils.QueryExecutionCache;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.storage.StorageFactory;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
@@ -69,6 +71,7 @@ import org.apache.kylin.shaded.com.google.common.base.Preconditions;
 import org.apache.kylin.shaded.com.google.common.collect.Lists;
 import org.apache.kylin.shaded.com.google.common.collect.Sets;
 
+import scala.Tuple2;
 import scala.collection.JavaConversions;
 
 public class CubeBuildJob extends SparkApplication {
@@ -113,6 +116,7 @@ public class CubeBuildJob extends SparkApplication {
                 // choose source
                 ParentSourceChooser sourceChooser = new ParentSourceChooser(spanningTree, seg, jobId, ss, config, true);
                 sourceChooser.decideSources();
+                Tuple2<String, AggInfo>[] aggInfos =  sourceChooser.getAggInfo();
                 NBuildSourceInfo buildFromFlatTable = sourceChooser.flatTableSource();
                 Map<Long, NBuildSourceInfo> buildFromLayouts = sourceChooser.reuseSources();
 
@@ -131,7 +135,8 @@ public class CubeBuildJob extends SparkApplication {
                 infos.recordSpanningTree(segId, spanningTree);
 
                 logger.info("Updating segment info");
-                updateSegmentInfo(getParam(MetadataConstants.P_CUBE_ID), seg, buildFromFlatTable.getFlattableDS().count());
+                assert buildFromFlatTable != null;
+                updateSegmentInfo(getParam(MetadataConstants.P_CUBE_ID), seg, buildFromFlatTable.getFlatTableDS().count());
             }
             updateSegmentSourceBytesSize(getParam(MetadataConstants.P_CUBE_ID),
                     ResourceDetectUtils.getSegmentSourceSize(shareDir));
@@ -157,6 +162,26 @@ public class CubeBuildJob extends SparkApplication {
         List<CubeSegment> cubeSegments = Lists.newArrayList();
         CubeSegment segment = cubeCopy.getSegmentById(segmentInfo.id());
         segment.setSizeKB(segmentInfo.getAllLayoutSize() / 1024);
+        List<String> cuboidStatics = new LinkedList<>();
+
+        String template = "{\"cuboid\":%d, \"rows\": %d, \"size\": %d}";
+        for (LayoutEntity layoutEntity : segmentInfo.getAllLayoutJava()) {
+            cuboidStatics.add(String.format(template, layoutEntity.getId(), layoutEntity.getRows(), layoutEntity.getByteSize()));
+        }
+
+        JavaSparkContext jsc = JavaSparkContext.fromSparkContext(ss.sparkContext());
+        JavaRDD<String> cuboidStatRdd = jsc.parallelize(cuboidStatics);
+        for (String cuboid : cuboidStatics) {
+            logger.info("Statistics \t: {}", cuboid);
+        }
+        String path = config.getHdfsWorkingDirectory() + segment.getPreciseStatisticsResourcePath();
+        logger.info("Saving {} {}", path, segmentInfo);
+        try {
+            cuboidStatRdd.saveAsTextFile(path);
+        } catch (Exception e) {
+            logger.error("Error", e);
+        }
+
         segment.setLastBuildTime(System.currentTimeMillis());
         segment.setLastBuildJobID(getParam(MetadataConstants.P_JOB_ID));
         segment.setInputRecords(sourceRowCount);
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
index 54afadf..84beaf2 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
@@ -47,6 +47,7 @@ object CuboidAggregator {
     aggInternal(ss, dataSet, dimensions, measures, isSparkSql)
   }
 
+  //noinspection ScalaStyle
   def aggInternal(ss: SparkSession,
                   dataSet: DataFrame,
                   dimensions: util.Set[Integer],
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatistics.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatistics.scala
new file mode 100644
index 0000000..d0ea199
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatistics.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.job
+
+
+import org.apache.kylin.engine.spark.metadata.SegmentInfo
+import org.apache.kylin.measure.hllc.HLLCounter
+import org.apache.kylin.shaded.com.google.common.hash.{HashFunction, Hashing}
+import org.apache.spark.sql.{Dataset, Row}
+
+import scala.collection.mutable
+
+
+object CuboidStatistics {
+
+  def sample(inputDs: Dataset[Row], seg: SegmentInfo): Array[(String, AggInfo)] = {
+    seg.getAllLayout.map(x => x.getId)
+    val rkc = seg.allColumns.count(c => c.rowKey)
+    val res = inputDs.rdd //.sample(withReplacement = false, 0.3)
+      .mapPartitions(new CuboidStatistics(seg.getAllLayout.map(x => x.getId), rkc).statisticsInPartition)
+    val l = res.map(a => (a.key, a)).reduceByKey((a, b) => a.merge(b)).collect()
+    l.foreach(x => println(x._1 + " >>><<< " + x._2.cuboid.counter.getCountEstimate))
+    l
+  }
+}
+
+class CuboidStatistics(ids: List[Long], rkc: Int) extends Serializable {
+  private val info = mutable.Map[String, AggInfo]()
+  val allCuboidsBitSet: Array[Array[Integer]] = getCuboidBitSet(ids, rkc)
+  private val hf: HashFunction = Hashing.murmur3_128
+  private val rowHashCodesLong = new Array[Long](rkc)
+
+  def statisticsInPartition(rows: Iterator[Row]): Iterator[AggInfo] = {
+    init()
+    rows.foreach(update)
+    info.valuesIterator
+  }
+
+  def init(): Unit = {
+    ids.foreach(i => info.put(i.toString, AggInfo(i.toString)))
+  }
+
+  def update(r: Row): Unit = {
+    println(r)
+    updateCuboid(r)
+  }
+
+  def updateCuboid(r: Row): Unit = {
+    // generate hash for each row key column
+
+    var idx = 0
+    while (idx < rkc) {
+      val hc = hf.newHasher
+      var colValue = r.get(idx).toString
+      if (colValue == null) colValue = "0"
+      //add column ordinal to the hash value to distinguish between (a,b) and (b,a)
+      rowHashCodesLong(idx) = hc.putUnencodedChars(colValue).hash().padToLong() + idx
+      idx += 1
+    }
+
+    // use the row key column hash to get a consolidated hash for each cuboid
+    val n = allCuboidsBitSet.length
+    idx = 0
+    while (idx < n) {
+      var value: Long = 0
+      var position = 0
+      while (position < allCuboidsBitSet(idx).length) {
+        value += rowHashCodesLong(allCuboidsBitSet(idx)(position))
+        position += 1
+      }
+      info(ids(idx).toString).cuboid.counter.addHashDirectly(value)
+      idx += 1
+    }
+  }
+
+  def getCuboidBitSet(cuboidIds: List[Long], nRowKey: Int): Array[Array[Integer]] = {
+    val allCuboidsBitSet: Array[Array[Integer]] = new Array[Array[Integer]](cuboidIds.length)
+    var j: Int = 0
+    while (j < cuboidIds.length) {
+      val cuboidId: Long = cuboidIds(j)
+      allCuboidsBitSet(j) = new Array[Integer](java.lang.Long.bitCount(cuboidId))
+      var mask: Long = 1L << (nRowKey - 1)
+      var position: Int = 0
+      var i: Int = 0
+      while (i < nRowKey) {
+        if ((mask & cuboidId) > 0) {
+          allCuboidsBitSet(j)(position) = i
+          position += 1
+        }
+        mask = mask >> 1
+        i += 1
+      }
+      j += 1
+    }
+    allCuboidsBitSet
+  }
+}
+
+case class AggInfo(key: String,
+                   cuboid: CuboidInfo = CuboidInfo(new HLLCounter()),
+                   sample: SampleInfo = SampleInfo(),
+                   dimension: DimensionInfo = DimensionInfo()) {
+  def merge(o: AggInfo): AggInfo = {
+    this.cuboid.counter.merge(o.cuboid.counter)
+    this
+  }
+}
+
+case class CuboidInfo(counter: HLLCounter = new HLLCounter(14))
+
+case class SampleInfo(data: Array[String] = new Array(3))
+
+case class DimensionInfo(range: mutable.Map[String, String] = mutable.Map[String, String]())
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
index 7697fd6..446f3e6 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
@@ -39,17 +39,25 @@ class ParentSourceChooser(
   config: KylinConfig,
   needEncoding: Boolean) extends Logging {
 
+  var aggInfo : Array[(String, AggInfo)]  = _
+
   // build from built cuboid.
   var reuseSources: java.util.Map[java.lang.Long, NBuildSourceInfo] = Maps.newHashMap()
 
-  // build from flattable.
+  // build from flatTable.
   var flatTableSource: NBuildSourceInfo = _
 
+  var detectStep = false
+
   //TODO: MetadataConverter don't have getCubeDesc() now
 
   /*val flatTableDesc = new CubeJoinedFlatTableDesc(
     MetadataConverter.getCubeDesc(seg.getCube),
     ParentSourceChooser.needJoinLookupTables(seg.getModel, toBuildTree))*/
+  def setDetectStep(): Unit =
+    detectStep = true
+
+  def getAggInfo : Array[(String, AggInfo)] = aggInfo
 
   def decideSources(): Unit = {
     toBuildTree.getRootIndexEntities.asScala.foreach { entity =>
@@ -72,7 +80,19 @@ class ParentSourceChooser(
         builder.checkDupKey()
         seg = builder.buildSnapshot
       }
-      flatTableSource = getFlatTable()
+      flatTableSource = getFlatTable
+
+      val rowKeyColumns: Seq[String] = seg.allColumns.filter(c => c.rowKey).map(c => c.id.toString)
+      if (aggInfo == null && !detectStep) {
+        logInfo("Start  sampling ...")
+        val coreDs = flatTableSource.getFlatTableDS.select(rowKeyColumns.head, rowKeyColumns.tail: _*)
+        aggInfo = CuboidStatistics.sample(coreDs, seg)
+        logInfo("Finish sampling ...")
+        val statisticsStr = aggInfo.sortBy(x => x._1).map(x => x._1 + ":" + x._2.cuboid.counter.getCountEstimate).mkString("\n")
+        logInfo(statisticsStr)
+      } else {
+        logInfo("Skip sampling ...")
+      }
     }
     flatTableSource.addCuboid(entity)
   }
@@ -91,7 +111,7 @@ class ParentSourceChooser(
     var path = ""
     if (flatTableSource != null && flatTableSource.getToBuildCuboids.size() > config.getPersistFlatTableThreshold) {
 
-      val df = flatTableSource.getFlattableDS
+      val df = flatTableSource.getFlatTableDS
       if (df.schema.nonEmpty) {
         val allUsedCols = flatTableSource.getToBuildCuboids.asScala.flatMap { c =>
           val dims = c.getOrderedDimensions.keySet().asScala.map(_.toString)
@@ -152,7 +172,7 @@ class ParentSourceChooser(
     buildSource
   }
 
-  private def getFlatTable(): NBuildSourceInfo = {
+  private def getFlatTable: NBuildSourceInfo = {
     //    val viewPath = persistFactViewIfNecessary()
     val sourceInfo = new NBuildSourceInfo
     sourceInfo.setSparkSession(ss)
@@ -162,7 +182,7 @@ class ParentSourceChooser(
     //    val needJoin = ParentSourceChooser.needJoinLookupTables(seg.getModel, toBuildTree)
     val flatTable = new CreateFlatTable(seg, toBuildTree, ss, sourceInfo)
     val afterJoin: Dataset[Row] = flatTable.generateDataset(needEncoding, true)
-    sourceInfo.setFlattableDS(afterJoin)
+    sourceInfo.setFlatTableDS(afterJoin)
 
     logInfo("No suitable ready layouts could be reused, generate dataset from flat table.")
     sourceInfo
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java
index 065bd5e..c3828cd 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java
@@ -58,6 +58,7 @@ public class ResourceDetectBeforeCubingJob extends SparkApplication {
             ResourceDetectUtils.write(new Path(config.getJobTmpShareDir(project, jobId), ResourceDetectUtils.countDistinctSuffix()),
                     ResourceDetectUtils.findCountDistinctMeasure(JavaConversions.asJavaCollection(seg.toBuildLayouts())));
             ParentSourceChooser datasetChooser = new ParentSourceChooser(spanningTree, seg, jobId, ss, config, false);
+            datasetChooser.setDetectStep();
             datasetChooser.decideSources();
             NBuildSourceInfo buildFromFlatTable = datasetChooser.flatTableSource();
             if (buildFromFlatTable != null) {
diff --git a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala
index d12e9ea..e81aa8e 100644
--- a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala
+++ b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala
@@ -31,15 +31,16 @@ class ColumnDesc(val columnName: String,
                  val dataType: DataType,
                  val tableName: String,
                  val tableAliasName: String,
-                 val id: Int) extends Serializable {
+                 val id: Int,
+                 val rowKey: Boolean = false) extends Serializable {
   def identity: String = s"$tableAliasName.$columnName"
 
   def isColumnType: Boolean = true
 }
 
 object ColumnDesc {
-  def apply(columnName: String, dataType: DataType, tableName: String, tableAliasName: String, id: Int):
-  ColumnDesc = new ColumnDesc(columnName, dataType, tableName, tableAliasName, id)
+  def apply(columnName: String, dataType: DataType, tableName: String, tableAliasName: String, id: Int, rowKey: Boolean):
+  ColumnDesc = new ColumnDesc(columnName, dataType, tableName, tableAliasName, id, rowKey)
 }
 
 case class LiteralColumnDesc(override val columnName: String,
@@ -120,11 +121,21 @@ case class SegmentInfo(id: String,
     snapshotInfo = tableInfo
   }
 
-  def getAllLayoutSize(): Long = {
+  def getAllLayoutSize: Long = {
     layouts.map(_.getByteSize).sum
   }
 
-  def getSnapShot2JavaMap(): java.util.Map[String, String] = {
+  def getAllLayout: List[LayoutEntity] = {
+    layouts
+  }
+
+  def getAllLayoutJava: java.util.List[LayoutEntity] = {
+    val l: util.LinkedList[LayoutEntity] = new java.util.LinkedList()
+    layouts.foreach(o => l.add(o))
+    l
+  }
+
+  def getSnapShot2JavaMap: java.util.Map[String, String] = {
     snapshotInfo.asJava
   }
 
diff --git a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala
index 970ff7b..30067ae 100644
--- a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala
+++ b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala
@@ -110,7 +110,7 @@ object MetadataConverter {
     val colToIndex = columnIDTuples.toMap
     columnIDTuples
       .foreach { co =>
-        dimensionIndex.put(co._2, toColumnDesc(co._1, co._2))
+        dimensionIndex.put(co._2, toColumnDesc(co._1, co._2, set.contains(co._1)))
       }
     dimensionIndex
   }
@@ -159,9 +159,9 @@ object MetadataConverter {
 
     val dimensionMap = dimensionMapping.toMap
     val shardByColumnsId = shardByColumns.asScala.toList
-            .map(column => dimensionMap.get(column))
-            .filter(v => v != null)
-            .map(column => Integer.valueOf(column.get))
+      .map(column => dimensionMap.get(column))
+      .filter(v => v != null)
+      .map(column => Integer.valueOf(column.get))
 
     val set = dimensionMapping.map(_._1).toSet
     val refs = cubeInstance.getAllColumns.asScala.diff(set)
@@ -204,13 +204,13 @@ object MetadataConverter {
     extractEntityAndMeasures(cubeInstance)._1.asJava
   }
 
-  private def toColumnDesc(ref: TblColRef, index: Int = -1) = {
+  private def toColumnDesc(ref: TblColRef, index: Int = -1, rowKey: Boolean = false) = {
     val dataType = SparkTypeUtil.toSparkType(KyDataType.getType(ref.getDatatype))
     val columnDesc = if (ref.getColumnDesc.isComputedColumn) {
       ComputedColumnDesc(ref.getName, dataType, ref.getTableRef.getTableName, ref.getTableRef.getAlias,
         index, ref.getExpressionInSourceDB)
     } else {
-      ColumnDesc(ref.getName, dataType, ref.getTableRef.getTableName, ref.getTableRef.getAlias, index)
+      ColumnDesc(ref.getName, dataType, ref.getTableRef.getTableName, ref.getTableRef.getAlias, index, rowKey)
     }
     columnDesc
   }