You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/01/04 06:05:04 UTC
[kylin] 01/06: KYLIN-4818 Calculate cuboid rowcount via HLL
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit a6f78cd53b951acf3ec966fa537dae3f2b82fd1e
Author: XiaoxiangYu <xx...@apache.org>
AuthorDate: Mon Nov 23 21:44:23 2020 +0800
KYLIN-4818 Calculate cuboid rowcount via HLL
---
.../kylin/common/annotation/Clarification.java | 6 +-
.../java/org/apache/kylin/cube/CubeSegment.java | 19 ++-
.../engine/spark/application/SparkApplication.java | 2 +-
.../engine/spark/builder/NBuildSourceInfo.java | 16 +--
.../kylin/engine/spark/job/CubeBuildJob.java | 27 ++++-
.../kylin/engine/spark/job/CuboidAggregator.scala | 1 +
.../kylin/engine/spark/job/CuboidStatistics.scala | 129 +++++++++++++++++++++
.../engine/spark/job/ParentSourceChooser.scala | 30 ++++-
.../spark/job/ResourceDetectBeforeCubingJob.java | 1 +
.../kylin/engine/spark/metadata/MetaData.scala | 21 +++-
.../engine/spark/metadata/MetadataConverter.scala | 12 +-
11 files changed, 234 insertions(+), 30 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/annotation/Clarification.java b/core-common/src/main/java/org/apache/kylin/common/annotation/Clarification.java
index b039e50..4b0ea2f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/annotation/Clarification.java
+++ b/core-common/src/main/java/org/apache/kylin/common/annotation/Clarification.java
@@ -30,9 +30,11 @@ import java.lang.annotation.Target;
ElementType.CONSTRUCTOR, ElementType.LOCAL_VARIABLE, ElementType.PACKAGE})
public @interface Clarification {
- Priority priority();
+ Priority priority() default Priority.MINOR;
- String msg() default "N/A";
+ String msg() default "null";
+
+ boolean deprecated() default false; // Please remove deprecated key when Kylin4 GA
enum Priority {
MINOR,
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index d0224fb..715e684 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -31,6 +31,7 @@ import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.annotation.Clarification;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.Pair;
@@ -75,8 +76,10 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
@JsonProperty("date_range_end")
private long dateRangeEnd;
@JsonProperty("source_offset_start")
+ @Clarification(deprecated = true)
private long sourceOffsetStart;
@JsonProperty("source_offset_end")
+ @Clarification(deprecated = true)
private long sourceOffsetEnd;
@JsonProperty("status")
private SegmentStatusEnum status;
@@ -107,6 +110,7 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
private String binarySignature; // a hash of cube schema and dictionary ID, used for sanity check
@JsonProperty("dictionaries")
+ @Clarification(deprecated = true)
private ConcurrentHashMap<String, String> dictionaries; // table/column ==> dictionary resource path
@JsonProperty("snapshots")
private ConcurrentHashMap<String, String> snapshots; // table name ==> snapshot resource path
@@ -116,13 +120,16 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
@JsonProperty("source_partition_offset_start")
@JsonInclude(JsonInclude.Include.NON_EMPTY)
+ @Clarification(deprecated = true)
private Map<Integer, Long> sourcePartitionOffsetStart = Maps.newHashMap();
@JsonProperty("source_partition_offset_end")
@JsonInclude(JsonInclude.Include.NON_EMPTY)
+ @Clarification(deprecated = true)
private Map<Integer, Long> sourcePartitionOffsetEnd = Maps.newHashMap();
@JsonProperty("stream_source_checkpoint")
+ @Clarification(deprecated = true)
private String streamSourceCheckpoint;
@JsonProperty("additionalInfo")
@@ -530,11 +537,19 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
}
public String getStatisticsResourcePath() {
- return getStatisticsResourcePath(this.getCubeInstance().getName(), this.getUuid());
+ return getStatisticsResourcePath(this.getCubeInstance().getName(), this.getUuid(), ".seq");
+ }
+
+ public String getPreciseStatisticsResourcePath() {
+ return getStatisticsResourcePath(this.getCubeInstance().getName(), this.getUuid(), ".json");
}
public static String getStatisticsResourcePath(String cubeName, String cubeSegmentId) {
- return ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeName + "/" + cubeSegmentId + ".seq";
+ return getStatisticsResourcePath(cubeName, cubeSegmentId, ".seq");
+ }
+
+ public static String getStatisticsResourcePath(String cubeName, String cubeSegmentId, String suffix) {
+ return ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeName + "/" + cubeSegmentId + suffix;
}
@Override
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 98dde2b..7867474 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -219,7 +219,7 @@ public abstract class SparkApplication {
jobId = getParam(MetadataConstants.P_JOB_ID);
project = getParam(MetadataConstants.P_PROJECT_NAME);
if (getParam(MetadataConstants.P_CUBOID_NUMBER) != null) {
- layoutSize = Integer.valueOf(getParam(MetadataConstants.P_CUBOID_NUMBER));
+ layoutSize = Integer.parseInt(getParam(MetadataConstants.P_CUBOID_NUMBER));
}
try (KylinConfig.SetAndUnsetThreadLocalConfig autoCloseConfig = KylinConfig
.setAndUnsetThreadLocalConfig(MetaDumpUtil.loadKylinConfigFromHdfs(hdfsMetalUrl))) {
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/builder/NBuildSourceInfo.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/builder/NBuildSourceInfo.java
index 4ae87ef..51d7414 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/builder/NBuildSourceInfo.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/builder/NBuildSourceInfo.java
@@ -34,7 +34,7 @@ import org.apache.kylin.shaded.com.google.common.base.Preconditions;
public class NBuildSourceInfo {
protected static final Logger logger = LoggerFactory.getLogger(NBuildSourceInfo.class);
- private Dataset<Row> flattableDS;
+ private Dataset<Row> flatTableDS;
private String viewFactTablePath;
private SparkSession ss;
private long byteSize;
@@ -52,12 +52,12 @@ public class NBuildSourceInfo {
this.byteSize = byteSize;
}
- public void setFlattableDS(Dataset<Row> flattableDS) {
- this.flattableDS = flattableDS;
+ public void setFlatTableDS(Dataset<Row> flatTableDS) {
+ this.flatTableDS = flatTableDS;
}
- public Dataset<Row> getFlattableDS() {
- return flattableDS;
+ public Dataset<Row> getFlatTableDS() {
+ return flatTableDS;
}
public Dataset<Row> getParentDS() {
@@ -66,9 +66,9 @@ public class NBuildSourceInfo {
Preconditions.checkNotNull(ss, "SparkSession is null is NBuildSourceInfo.");
return ss.read().parquet(parentStoragePath);
} else {
- Preconditions.checkState(flattableDS != null, "Path and DS can no be empty at the same time.");
- logger.info("parent storage path not exists, use flattable dataset.");
- return flattableDS;
+ Preconditions.checkState(flatTableDS != null, "Path and DS can no be empty at the same time.");
+ logger.info("parent storage path not exists, use flatTable dataset.");
+ return flatTableDS;
}
}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index 38a101b..1028911 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -59,6 +59,8 @@ import org.apache.kylin.engine.spark.utils.QueryExecutionCache;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.model.IStorageAware;
import org.apache.kylin.storage.StorageFactory;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
@@ -69,6 +71,7 @@ import org.apache.kylin.shaded.com.google.common.base.Preconditions;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.shaded.com.google.common.collect.Sets;
+import scala.Tuple2;
import scala.collection.JavaConversions;
public class CubeBuildJob extends SparkApplication {
@@ -113,6 +116,7 @@ public class CubeBuildJob extends SparkApplication {
// choose source
ParentSourceChooser sourceChooser = new ParentSourceChooser(spanningTree, seg, jobId, ss, config, true);
sourceChooser.decideSources();
+ Tuple2<String, AggInfo>[] aggInfos = sourceChooser.getAggInfo();
NBuildSourceInfo buildFromFlatTable = sourceChooser.flatTableSource();
Map<Long, NBuildSourceInfo> buildFromLayouts = sourceChooser.reuseSources();
@@ -131,7 +135,8 @@ public class CubeBuildJob extends SparkApplication {
infos.recordSpanningTree(segId, spanningTree);
logger.info("Updating segment info");
- updateSegmentInfo(getParam(MetadataConstants.P_CUBE_ID), seg, buildFromFlatTable.getFlattableDS().count());
+ assert buildFromFlatTable != null;
+ updateSegmentInfo(getParam(MetadataConstants.P_CUBE_ID), seg, buildFromFlatTable.getFlatTableDS().count());
}
updateSegmentSourceBytesSize(getParam(MetadataConstants.P_CUBE_ID),
ResourceDetectUtils.getSegmentSourceSize(shareDir));
@@ -157,6 +162,26 @@ public class CubeBuildJob extends SparkApplication {
List<CubeSegment> cubeSegments = Lists.newArrayList();
CubeSegment segment = cubeCopy.getSegmentById(segmentInfo.id());
segment.setSizeKB(segmentInfo.getAllLayoutSize() / 1024);
+ List<String> cuboidStatics = new LinkedList<>();
+
+ String template = "{\"cuboid\":%d, \"rows\": %d, \"size\": %d}";
+ for (LayoutEntity layoutEntity : segmentInfo.getAllLayoutJava()) {
+ cuboidStatics.add(String.format(template, layoutEntity.getId(), layoutEntity.getRows(), layoutEntity.getByteSize()));
+ }
+
+ JavaSparkContext jsc = JavaSparkContext.fromSparkContext(ss.sparkContext());
+ JavaRDD<String> cuboidStatRdd = jsc.parallelize(cuboidStatics);
+ for (String cuboid : cuboidStatics) {
+ logger.info("Statistics \t: {}", cuboid);
+ }
+ String path = config.getHdfsWorkingDirectory() + segment.getPreciseStatisticsResourcePath();
+ logger.info("Saving {} {}", path, segmentInfo);
+ try {
+ cuboidStatRdd.saveAsTextFile(path);
+ } catch (Exception e) {
+ logger.error("Error", e);
+ }
+
segment.setLastBuildTime(System.currentTimeMillis());
segment.setLastBuildJobID(getParam(MetadataConstants.P_JOB_ID));
segment.setInputRecords(sourceRowCount);
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
index 54afadf..84beaf2 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
@@ -47,6 +47,7 @@ object CuboidAggregator {
aggInternal(ss, dataSet, dimensions, measures, isSparkSql)
}
+ //noinspection ScalaStyle
def aggInternal(ss: SparkSession,
dataSet: DataFrame,
dimensions: util.Set[Integer],
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatistics.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatistics.scala
new file mode 100644
index 0000000..d0ea199
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatistics.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.job
+
+
+import org.apache.kylin.engine.spark.metadata.SegmentInfo
+import org.apache.kylin.measure.hllc.HLLCounter
+import org.apache.kylin.shaded.com.google.common.hash.{HashFunction, Hashing}
+import org.apache.spark.sql.{Dataset, Row}
+
+import scala.collection.mutable
+
+
+object CuboidStatistics {
+
+ def sample(inputDs: Dataset[Row], seg: SegmentInfo): Array[(String, AggInfo)] = {
+ seg.getAllLayout.map(x => x.getId)
+ val rkc = seg.allColumns.count(c => c.rowKey)
+ val res = inputDs.rdd //.sample(withReplacement = false, 0.3)
+ .mapPartitions(new CuboidStatistics(seg.getAllLayout.map(x => x.getId), rkc).statisticsInPartition)
+ val l = res.map(a => (a.key, a)).reduceByKey((a, b) => a.merge(b)).collect()
+ l.foreach(x => println(x._1 + " >>><<< " + x._2.cuboid.counter.getCountEstimate))
+ l
+ }
+}
+
+class CuboidStatistics(ids: List[Long], rkc: Int) extends Serializable {
+ private val info = mutable.Map[String, AggInfo]()
+ val allCuboidsBitSet: Array[Array[Integer]] = getCuboidBitSet(ids, rkc)
+ private val hf: HashFunction = Hashing.murmur3_128
+ private val rowHashCodesLong = new Array[Long](rkc)
+
+ def statisticsInPartition(rows: Iterator[Row]): Iterator[AggInfo] = {
+ init()
+ rows.foreach(update)
+ info.valuesIterator
+ }
+
+ def init(): Unit = {
+ ids.foreach(i => info.put(i.toString, AggInfo(i.toString)))
+ }
+
+ def update(r: Row): Unit = {
+ println(r)
+ updateCuboid(r)
+ }
+
+ def updateCuboid(r: Row): Unit = {
+ // generate hash for each row key column
+
+ var idx = 0
+ while (idx < rkc) {
+ val hc = hf.newHasher
+ var colValue = r.get(idx).toString
+ if (colValue == null) colValue = "0"
+ //add column ordinal to the hash value to distinguish between (a,b) and (b,a)
+ rowHashCodesLong(idx) = hc.putUnencodedChars(colValue).hash().padToLong() + idx
+ idx += 1
+ }
+
+ // use the row key column hash to get a consolidated hash for each cuboid
+ val n = allCuboidsBitSet.length
+ idx = 0
+ while (idx < n) {
+ var value: Long = 0
+ var position = 0
+ while (position < allCuboidsBitSet(idx).length) {
+ value += rowHashCodesLong(allCuboidsBitSet(idx)(position))
+ position += 1
+ }
+ info(ids(idx).toString).cuboid.counter.addHashDirectly(value)
+ idx += 1
+ }
+ }
+
+ def getCuboidBitSet(cuboidIds: List[Long], nRowKey: Int): Array[Array[Integer]] = {
+ val allCuboidsBitSet: Array[Array[Integer]] = new Array[Array[Integer]](cuboidIds.length)
+ var j: Int = 0
+ while (j < cuboidIds.length) {
+ val cuboidId: Long = cuboidIds(j)
+ allCuboidsBitSet(j) = new Array[Integer](java.lang.Long.bitCount(cuboidId))
+ var mask: Long = 1L << (nRowKey - 1)
+ var position: Int = 0
+ var i: Int = 0
+ while (i < nRowKey) {
+ if ((mask & cuboidId) > 0) {
+ allCuboidsBitSet(j)(position) = i
+ position += 1
+ }
+ mask = mask >> 1
+ i += 1
+ }
+ j += 1
+ }
+ allCuboidsBitSet
+ }
+}
+
+case class AggInfo(key: String,
+ cuboid: CuboidInfo = CuboidInfo(new HLLCounter()),
+ sample: SampleInfo = SampleInfo(),
+ dimension: DimensionInfo = DimensionInfo()) {
+ def merge(o: AggInfo): AggInfo = {
+ this.cuboid.counter.merge(o.cuboid.counter)
+ this
+ }
+}
+
+case class CuboidInfo(counter: HLLCounter = new HLLCounter(14))
+
+case class SampleInfo(data: Array[String] = new Array(3))
+
+case class DimensionInfo(range: mutable.Map[String, String] = mutable.Map[String, String]())
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
index 7697fd6..446f3e6 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
@@ -39,17 +39,25 @@ class ParentSourceChooser(
config: KylinConfig,
needEncoding: Boolean) extends Logging {
+ var aggInfo : Array[(String, AggInfo)] = _
+
// build from built cuboid.
var reuseSources: java.util.Map[java.lang.Long, NBuildSourceInfo] = Maps.newHashMap()
- // build from flattable.
+ // build from flatTable.
var flatTableSource: NBuildSourceInfo = _
+ var detectStep = false
+
//TODO: MetadataConverter don't have getCubeDesc() now
/*val flatTableDesc = new CubeJoinedFlatTableDesc(
MetadataConverter.getCubeDesc(seg.getCube),
ParentSourceChooser.needJoinLookupTables(seg.getModel, toBuildTree))*/
+ def setDetectStep(): Unit =
+ detectStep = true
+
+ def getAggInfo : Array[(String, AggInfo)] = aggInfo
def decideSources(): Unit = {
toBuildTree.getRootIndexEntities.asScala.foreach { entity =>
@@ -72,7 +80,19 @@ class ParentSourceChooser(
builder.checkDupKey()
seg = builder.buildSnapshot
}
- flatTableSource = getFlatTable()
+ flatTableSource = getFlatTable
+
+ val rowKeyColumns: Seq[String] = seg.allColumns.filter(c => c.rowKey).map(c => c.id.toString)
+ if (aggInfo == null && !detectStep) {
+ logInfo("Start sampling ...")
+ val coreDs = flatTableSource.getFlatTableDS.select(rowKeyColumns.head, rowKeyColumns.tail: _*)
+ aggInfo = CuboidStatistics.sample(coreDs, seg)
+ logInfo("Finish sampling ...")
+ val statisticsStr = aggInfo.sortBy(x => x._1).map(x => x._1 + ":" + x._2.cuboid.counter.getCountEstimate).mkString("\n")
+ logInfo(statisticsStr)
+ } else {
+ logInfo("Skip sampling ...")
+ }
}
flatTableSource.addCuboid(entity)
}
@@ -91,7 +111,7 @@ class ParentSourceChooser(
var path = ""
if (flatTableSource != null && flatTableSource.getToBuildCuboids.size() > config.getPersistFlatTableThreshold) {
- val df = flatTableSource.getFlattableDS
+ val df = flatTableSource.getFlatTableDS
if (df.schema.nonEmpty) {
val allUsedCols = flatTableSource.getToBuildCuboids.asScala.flatMap { c =>
val dims = c.getOrderedDimensions.keySet().asScala.map(_.toString)
@@ -152,7 +172,7 @@ class ParentSourceChooser(
buildSource
}
- private def getFlatTable(): NBuildSourceInfo = {
+ private def getFlatTable: NBuildSourceInfo = {
// val viewPath = persistFactViewIfNecessary()
val sourceInfo = new NBuildSourceInfo
sourceInfo.setSparkSession(ss)
@@ -162,7 +182,7 @@ class ParentSourceChooser(
// val needJoin = ParentSourceChooser.needJoinLookupTables(seg.getModel, toBuildTree)
val flatTable = new CreateFlatTable(seg, toBuildTree, ss, sourceInfo)
val afterJoin: Dataset[Row] = flatTable.generateDataset(needEncoding, true)
- sourceInfo.setFlattableDS(afterJoin)
+ sourceInfo.setFlatTableDS(afterJoin)
logInfo("No suitable ready layouts could be reused, generate dataset from flat table.")
sourceInfo
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java
index 065bd5e..c3828cd 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java
@@ -58,6 +58,7 @@ public class ResourceDetectBeforeCubingJob extends SparkApplication {
ResourceDetectUtils.write(new Path(config.getJobTmpShareDir(project, jobId), ResourceDetectUtils.countDistinctSuffix()),
ResourceDetectUtils.findCountDistinctMeasure(JavaConversions.asJavaCollection(seg.toBuildLayouts())));
ParentSourceChooser datasetChooser = new ParentSourceChooser(spanningTree, seg, jobId, ss, config, false);
+ datasetChooser.setDetectStep();
datasetChooser.decideSources();
NBuildSourceInfo buildFromFlatTable = datasetChooser.flatTableSource();
if (buildFromFlatTable != null) {
diff --git a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala
index d12e9ea..e81aa8e 100644
--- a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala
+++ b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala
@@ -31,15 +31,16 @@ class ColumnDesc(val columnName: String,
val dataType: DataType,
val tableName: String,
val tableAliasName: String,
- val id: Int) extends Serializable {
+ val id: Int,
+ val rowKey: Boolean = false) extends Serializable {
def identity: String = s"$tableAliasName.$columnName"
def isColumnType: Boolean = true
}
object ColumnDesc {
- def apply(columnName: String, dataType: DataType, tableName: String, tableAliasName: String, id: Int):
- ColumnDesc = new ColumnDesc(columnName, dataType, tableName, tableAliasName, id)
+ def apply(columnName: String, dataType: DataType, tableName: String, tableAliasName: String, id: Int, rowKey: Boolean):
+ ColumnDesc = new ColumnDesc(columnName, dataType, tableName, tableAliasName, id, rowKey)
}
case class LiteralColumnDesc(override val columnName: String,
@@ -120,11 +121,21 @@ case class SegmentInfo(id: String,
snapshotInfo = tableInfo
}
- def getAllLayoutSize(): Long = {
+ def getAllLayoutSize: Long = {
layouts.map(_.getByteSize).sum
}
- def getSnapShot2JavaMap(): java.util.Map[String, String] = {
+ def getAllLayout: List[LayoutEntity] = {
+ layouts
+ }
+
+ def getAllLayoutJava: java.util.List[LayoutEntity] = {
+ val l: util.LinkedList[LayoutEntity] = new java.util.LinkedList()
+ layouts.foreach(o => l.add(o))
+ l
+ }
+
+ def getSnapShot2JavaMap: java.util.Map[String, String] = {
snapshotInfo.asJava
}
diff --git a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala
index 970ff7b..30067ae 100644
--- a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala
+++ b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala
@@ -110,7 +110,7 @@ object MetadataConverter {
val colToIndex = columnIDTuples.toMap
columnIDTuples
.foreach { co =>
- dimensionIndex.put(co._2, toColumnDesc(co._1, co._2))
+ dimensionIndex.put(co._2, toColumnDesc(co._1, co._2, set.contains(co._1)))
}
dimensionIndex
}
@@ -159,9 +159,9 @@ object MetadataConverter {
val dimensionMap = dimensionMapping.toMap
val shardByColumnsId = shardByColumns.asScala.toList
- .map(column => dimensionMap.get(column))
- .filter(v => v != null)
- .map(column => Integer.valueOf(column.get))
+ .map(column => dimensionMap.get(column))
+ .filter(v => v != null)
+ .map(column => Integer.valueOf(column.get))
val set = dimensionMapping.map(_._1).toSet
val refs = cubeInstance.getAllColumns.asScala.diff(set)
@@ -204,13 +204,13 @@ object MetadataConverter {
extractEntityAndMeasures(cubeInstance)._1.asJava
}
- private def toColumnDesc(ref: TblColRef, index: Int = -1) = {
+ private def toColumnDesc(ref: TblColRef, index: Int = -1, rowKey: Boolean = false) = {
val dataType = SparkTypeUtil.toSparkType(KyDataType.getType(ref.getDatatype))
val columnDesc = if (ref.getColumnDesc.isComputedColumn) {
ComputedColumnDesc(ref.getName, dataType, ref.getTableRef.getTableName, ref.getTableRef.getAlias,
index, ref.getExpressionInSourceDB)
} else {
- ColumnDesc(ref.getName, dataType, ref.getTableRef.getTableName, ref.getTableRef.getAlias, index)
+ ColumnDesc(ref.getName, dataType, ref.getTableRef.getTableName, ref.getTableRef.getAlias, index, rowKey)
}
columnDesc
}