You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/11/26 10:39:58 UTC

[kylin] branch master updated: KYLIN-3693 TopN incorrect in Spark engine

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new ac7a86a  KYLIN-3693 TopN incorrect in Spark engine
ac7a86a is described below

commit ac7a86a62a68301a195599e12cc8d7ddeeb73abd
Author: shaofengshi <sh...@apache.org>
AuthorDate: Mon Nov 26 11:24:32 2018 +0800

    KYLIN-3693 TopN incorrect in Spark engine
---
 .../kylin/measure/bitmap/BitmapAggregator.java     | 10 ++++--
 .../apache/kylin/measure/hllc/HLLCAggregator.java  | 10 ++++--
 .../measure/percentile/PercentileAggregator.java   | 10 ++++--
 .../apache/kylin/measure/raw/RawAggregator.java    | 12 ++++---
 .../org/apache/kylin/measure/topn/Counter.java     | 18 +---------
 .../kylin/measure/topn/DoubleDeltaSerializer.java  |  2 +-
 .../apache/kylin/measure/topn/TopNAggregator.java  |  8 +++--
 .../kylin/measure/topn/TopNCounterSerializer.java  |  2 +-
 .../apache/kylin/measure/topn/TopNMeasureType.java | 41 +++++++++++++---------
 .../kylin/engine/mr/steps/SegmentReEncoder.java    | 11 ++----
 .../kylin/engine/spark/SparkCubingByLayer.java     |  8 ++---
 .../kylin/engine/spark/SparkCubingMerge.java       | 17 +++++----
 .../org/apache/kylin/query/ITKylinQueryTest.java   | 18 +++++-----
 .../query/sql_distinct_precisely/query03.sql       |  6 ++--
 .../query/sql_distinct_precisely/query04.sql       |  6 ++--
 .../query03.sql                                    |  2 +-
 .../query04.sql}                                   |  1 +
 17 files changed, 97 insertions(+), 85 deletions(-)

diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java
index 19fa49e..d57af48 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java
@@ -45,8 +45,14 @@ public class BitmapAggregator extends MeasureAggregator<BitmapCounter> {
 
     @Override
     public BitmapCounter aggregate(BitmapCounter value1, BitmapCounter value2) {
-        value1.orWith(value2);
-        return value1;
+        BitmapCounter merged = bitmapFactory.newBitmap();
+        if (value1 != null) {
+            merged.orWith(value1);
+        }
+        if (value2 != null) {
+            merged.orWith(value2);
+        }
+        return merged;
     }
 
     @Override
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java
index 4e09265..a134f92 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java
@@ -47,8 +47,14 @@ public class HLLCAggregator extends MeasureAggregator<HLLCounter> {
 
     @Override
     public HLLCounter aggregate(HLLCounter value1, HLLCounter value2) {
-        value1.merge(value2);
-        return value1;
+        if (value1 == null) {
+            return new HLLCounter(value2);
+        } else if (value2 == null) {
+            return new HLLCounter(value1);
+        }
+        HLLCounter result = new HLLCounter(value1);
+        result.merge(value2);
+        return result;
     }
 
     @Override
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggregator.java
index ef8896a..b7185e3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggregator.java
@@ -43,8 +43,14 @@ public class PercentileAggregator extends MeasureAggregator<PercentileCounter> {
 
     @Override
     public PercentileCounter aggregate(PercentileCounter value1, PercentileCounter value2) {
-        value1.merge(value2);
-        return value1;
+        if (value1 == null) {
+            return new PercentileCounter(value2);
+        } else if (value2 == null) {
+            return new PercentileCounter(value1);
+        }
+        PercentileCounter merged = new PercentileCounter(value1);
+        merged.merge(value2);
+        return merged;
     }
 
     @Override
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawAggregator.java
index 2ee36e3..fff316c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawAggregator.java
@@ -21,6 +21,7 @@ package org.apache.kylin.measure.raw;
 import java.util.ArrayList;
 import java.util.List;
 
+import com.google.common.collect.Lists;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.measure.MeasureAggregator;
 
@@ -49,13 +50,14 @@ public class RawAggregator extends MeasureAggregator<List<ByteArray>> {
     @Override
     public List<ByteArray> aggregate(List<ByteArray> value1, List<ByteArray> value2) {
         if (value1 == null) {
-            return value2;
+            return Lists.newArrayList(value2);
         } else if (value2 == null) {
-            return value1;
+            return Lists.newArrayList(value1);
         }
-
-        value1.addAll(value2);
-        return value1;
+        List<ByteArray> result = new ArrayList<>(value1.size() + value2.size());
+        result.addAll(value1);
+        result.addAll(value2);
+        return result;
     }
 
     @Override
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
index d8fdc6e..219c712 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
@@ -18,10 +18,6 @@
 
 package org.apache.kylin.measure.topn;
 
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
 import java.io.Serializable;
 
 /**
@@ -29,7 +25,7 @@ import java.io.Serializable;
  * 
  * @param <T>
  */
-public class Counter<T> implements Externalizable, Serializable{
+public class Counter<T> implements Serializable{
 
     protected T item;
     protected double count;
@@ -67,16 +63,4 @@ public class Counter<T> implements Externalizable, Serializable{
         return item + ":" + count;
     }
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        item = (T) in.readObject();
-        count = in.readDouble();
-    }
-
-    @Override
-    public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeObject(item);
-        out.writeDouble(count);
-    }
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoubleDeltaSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoubleDeltaSerializer.java
index ac925e2..1cb63e4 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoubleDeltaSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoubleDeltaSerializer.java
@@ -32,7 +32,7 @@ public class DoubleDeltaSerializer implements java.io.Serializable {
     static final int LENGTH_BITS = 23;
 
     static final long[] MASKS = new long[64];
-    {
+    static {
         for (int i = 0; i < MASKS.length; i++) {
             MASKS[i] = (1L << i) - 1;
         }
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
index 34ceb9c..bc2bc36 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
@@ -46,8 +46,12 @@ public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
 
     @Override
     public TopNCounter<ByteArray> aggregate(TopNCounter<ByteArray> value1, TopNCounter<ByteArray> value2) {
-        value1.merge(value2);
-        return value1;
+        int thisCapacity = value1.getCapacity();
+        TopNCounter<ByteArray> aggregated = new TopNCounter<>(thisCapacity * 2);
+        aggregated.merge(value1);
+        aggregated.merge(value2);
+        aggregated.retain(thisCapacity);
+        return aggregated;
     }
 
     @Override
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
index eff510f..aec8d6d 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
@@ -93,7 +93,7 @@ public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteAr
         int keyLength = in.getInt();
         double[] counters = dds.deserialize(in);
 
-        TopNCounter<ByteArray> counter = new TopNCounter<ByteArray>(capacity);
+        TopNCounter<ByteArray> counter = new TopNCounter<>(capacity);
         ByteArray byteArray;
         byte[] keyArray = new byte[size * keyLength];
         int offset = 0;
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
index d7b1bd7..d53a70a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -173,24 +173,11 @@ public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> {
                 TopNCounter<ByteArray> topNCounter = value;
 
                 if (newDimensionEncodings == null) {
-                    literalCols = getTopNLiteralColumn(measureDesc.getFunction());
-                    dimensionEncodings = getDimensionEncodings(measureDesc.getFunction(), literalCols, oldDicts);
-                    keyLength = 0;
-                    boolean hasDictEncoding = false;
-                    for (DimensionEncoding encoding : dimensionEncodings) {
-                        keyLength += encoding.getLengthOfEncoding();
-                        if (encoding instanceof DictionaryDimEnc) {
-                            hasDictEncoding = true;
+                    synchronized (MeasureIngester.class) {
+                        if (newDimensionEncodings == null) {
+                            initialize(measureDesc, oldDicts, newDicts);
                         }
                     }
-
-                    newDimensionEncodings = getDimensionEncodings(measureDesc.getFunction(), literalCols, newDicts);
-                    newKeyLength = 0;
-                    for (DimensionEncoding encoding : newDimensionEncodings) {
-                        newKeyLength += encoding.getLengthOfEncoding();
-                    }
-
-                    needReEncode = hasDictEncoding;
                 }
 
                 if (needReEncode == false) {
@@ -218,6 +205,28 @@ public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> {
                 }
                 return topNCounter;
             }
+
+            private void initialize(MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts,
+                    Map<TblColRef, Dictionary<String>> newDicts) {
+                literalCols = getTopNLiteralColumn(measureDesc.getFunction());
+                dimensionEncodings = getDimensionEncodings(measureDesc.getFunction(), literalCols, oldDicts);
+                keyLength = 0;
+                boolean hasDictEncoding = false;
+                for (DimensionEncoding encoding : dimensionEncodings) {
+                    keyLength += encoding.getLengthOfEncoding();
+                    if (encoding instanceof DictionaryDimEnc) {
+                        hasDictEncoding = true;
+                    }
+                }
+
+                newDimensionEncodings = getDimensionEncodings(measureDesc.getFunction(), literalCols, newDicts);
+                newKeyLength = 0;
+                for (DimensionEncoding encoding : newDimensionEncodings) {
+                    newKeyLength += encoding.getLengthOfEncoding();
+                }
+
+                needReEncode = hasDictEncoding;
+            }
         };
     }
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SegmentReEncoder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SegmentReEncoder.java
index 35fa8af..57f7d70 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SegmentReEncoder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SegmentReEncoder.java
@@ -68,6 +68,7 @@ public class SegmentReEncoder implements Serializable {
     private BufferedMeasureCodec codec;
     private CubeDesc cubeDesc;
     private KylinConfig kylinConfig;
+    private Text textValue = new Text();
 
     public SegmentReEncoder(CubeDesc cubeDesc, CubeSegment mergingSeg, CubeSegment mergedSeg, KylinConfig kylinConfig) {
         this.cubeDesc = cubeDesc;
@@ -136,10 +137,8 @@ public class SegmentReEncoder implements Serializable {
             }
 
             ByteBuffer valueBuf = codec.encode(measureObjs);
-            byte[] resultValue = new byte[valueBuf.position()];
-            System.arraycopy(valueBuf.array(), 0, resultValue, 0, valueBuf.position());
-
-            return Pair.newPair(processKey(key), new Text(resultValue));
+            textValue.set(valueBuf.array(), 0, valueBuf.position());
+            return Pair.newPair(processKey(key), textValue);
         } else {
             return Pair.newPair(processKey(key), value);
         }
@@ -167,10 +166,6 @@ public class SegmentReEncoder implements Serializable {
                 measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts);
             }
 
-            ByteBuffer valueBuf = codec.encode(measureObjs);
-            byte[] resultValue = new byte[valueBuf.position()];
-            System.arraycopy(valueBuf.array(), 0, resultValue, 0, valueBuf.position());
-
         }
         return Pair.newPair(processKey(key), measureObjs);
     }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 2f453c5..f3b0a13 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -246,12 +246,10 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
                             }
                         }
                         ByteBuffer valueBuf = codec.encode(tuple2._2());
-                        byte[] encodedBytes = new byte[valueBuf.position()];
-                        System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
-                        return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2._1().array()),
-                                new org.apache.hadoop.io.Text(encodedBytes));
+                        org.apache.hadoop.io.Text textResult =  new org.apache.hadoop.io.Text();
+                        textResult.set(valueBuf.array(), 0, valueBuf.position());
+                        return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2._1().array()), textResult);
                     }
-
                 }).saveAsNewAPIHadoopDataset(job.getConfiguration());
         logger.info("Persisting RDD for level " + level + " into " + cuboidOutputPath);
     }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
index 5037647..a3b13a8 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
@@ -195,7 +195,7 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
                     JavaPairRDD segRdd = SparkUtil.parseInputPath(path, fs, sc, Text.class, Text.class);
                     CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
                     // re-encode with new dictionaries
-                    JavaPairRDD<Text, Object[]> newEcoddedRdd = segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
+                    JavaPairRDD<Text, Object[]> newEcoddedRdd = segRdd.mapToPair(new ReEncodeCuboidFunction(cubeName,
                             sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
                     mergingSegs.add(newEcoddedRdd);
                 }
@@ -215,7 +215,7 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
                         final String cuboidInputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(path, level);
                         JavaPairRDD<Text, Text> segRdd = sc.sequenceFile(cuboidInputPath, Text.class, Text.class);
                         // re-encode with new dictionaries
-                        JavaPairRDD<Text, Object[]> newEcoddedRdd = segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
+                        JavaPairRDD<Text, Object[]> newEcoddedRdd = segRdd.mapToPair(new ReEncodeCuboidFunction(cubeName,
                                 sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
                         mergingSegs.add(newEcoddedRdd);
                     }
@@ -235,7 +235,7 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
         }
     }
 
-    static class ReEncodCuboidFunction implements PairFunction<Tuple2<Text, Text>, Text, Object[]> {
+    static class ReEncodeCuboidFunction implements PairFunction<Tuple2<Text, Text>, Text, Object[]> {
         private transient volatile boolean initialized = false;
         private String cubeName;
         private String sourceSegmentId;
@@ -244,10 +244,9 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
         private SerializableConfiguration conf;
         private transient KylinConfig kylinConfig;
         private transient SegmentReEncoder segmentReEncoder = null;
-        private transient Pair<Text, Object[]> encodedPari = null;
 
-        ReEncodCuboidFunction(String cubeName, String sourceSegmentId, String mergedSegmentId, String metaUrl,
-                SerializableConfiguration conf) {
+        ReEncodeCuboidFunction(String cubeName, String sourceSegmentId, String mergedSegmentId, String metaUrl,
+                               SerializableConfiguration conf) {
             this.cubeName = cubeName;
             this.sourceSegmentId = sourceSegmentId;
             this.mergedSegmentId = mergedSegmentId;
@@ -267,15 +266,15 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
         @Override
         public Tuple2<Text, Object[]> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
             if (initialized == false) {
-                synchronized (ReEncodCuboidFunction.class) {
+                synchronized (ReEncodeCuboidFunction.class) {
                     if (initialized == false) {
                         init();
                         initialized = true;
                     }
                 }
             }
-            encodedPari = segmentReEncoder.reEncode2(textTextTuple2._1, textTextTuple2._2);
-            return new Tuple2(encodedPari.getFirst(), encodedPari.getSecond());
+            Pair<Text, Object[]> encodedPair = segmentReEncoder.reEncode2(textTextTuple2._1, textTextTuple2._2);
+            return new Tuple2(encodedPair.getFirst(), encodedPair.getSecond());
         }
     }
 
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index c6d1f62..4fdc68e 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -266,22 +266,24 @@ public class ITKylinQueryTest extends KylinTestBase {
 
     @Test
     public void testDistinctCountQuery() throws Exception {
-        if ("left".equalsIgnoreCase(joinType)) {
-            batchExecuteQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_distinct");
-        }
+        batchExecuteQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_distinct");
     }
 
     @Test
     public void testTopNQuery() throws Exception {
-        if ("left".equalsIgnoreCase(joinType)) {
-            this.execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_topn", null, true);
-        }
+        this.execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_topn", null, true);
     }
 
     @Test
     public void testPreciselyDistinctCountQuery() throws Exception {
-        if ("left".equalsIgnoreCase(joinType)) {
-            execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_distinct_precisely", null, true);
+        execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_distinct_precisely", null, true);
+    }
+
+    @Test
+    public void testPreciselyDistinctCountRollupQuery() throws Exception {
+        // the "inner" test cube uses "SegmentAppendTrieDictBuilder" which doesn't support rollup.
+        if("left".equalsIgnoreCase(joinType)) {
+            execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_distinct_precisely_rollup", null, true);
         }
     }
 
diff --git a/kylin-it/src/test/resources/query/sql_distinct_precisely/query03.sql b/kylin-it/src/test/resources/query/sql_distinct_precisely/query03.sql
index 2be8963..7434769 100644
--- a/kylin-it/src/test/resources/query/sql_distinct_precisely/query03.sql
+++ b/kylin-it/src/test/resources/query/sql_distinct_precisely/query03.sql
@@ -16,7 +16,7 @@
 -- limitations under the License.
 --
 
-select test_cal_dt.week_beg_dt,sum(test_kylin_fact.price) as GMV
+select test_cal_dt.cal_dt,sum(test_kylin_fact.price) as GMV
  , count(1) as TRANS_CNT
  , count(distinct TEST_COUNT_DISTINCT_BITMAP) as user_count
  , count(distinct site_name) as site_count
@@ -31,5 +31,5 @@ select test_cal_dt.week_beg_dt,sum(test_kylin_fact.price) as GMV
  inner JOIN edw.test_seller_type_dim as test_seller_type_dim
  on test_kylin_fact.slr_segment_cd = test_seller_type_dim.seller_type_cd
  where test_kylin_fact.lstg_format_name='FP-GTC'
- and test_cal_dt.week_beg_dt between DATE '2013-05-01' and DATE '2013-08-01'
- group by test_cal_dt.week_beg_dt
+ and test_cal_dt.cal_dt between DATE '2013-05-01' and DATE '2013-08-01'
+ group by test_cal_dt.cal_dt
diff --git a/kylin-it/src/test/resources/query/sql_distinct_precisely/query04.sql b/kylin-it/src/test/resources/query/sql_distinct_precisely/query04.sql
index 7306824..649c758 100644
--- a/kylin-it/src/test/resources/query/sql_distinct_precisely/query04.sql
+++ b/kylin-it/src/test/resources/query/sql_distinct_precisely/query04.sql
@@ -16,7 +16,7 @@
 -- limitations under the License.
 --
 
-select test_cal_dt.week_beg_dt,sum(test_kylin_fact.price) as GMV
+select test_cal_dt.cal_dt,sum(test_kylin_fact.price) as GMV
  , count(1) as TRANS_CNT
  , count(distinct TEST_COUNT_DISTINCT_BITMAP) as user_count
  , count(distinct site_name) as site_count
@@ -31,6 +31,6 @@ select test_cal_dt.week_beg_dt,sum(test_kylin_fact.price) as GMV
  inner JOIN edw.test_seller_type_dim as test_seller_type_dim
  on test_kylin_fact.slr_segment_cd = test_seller_type_dim.seller_type_cd
  where test_kylin_fact.lstg_format_name='FP-GTC'
- and test_cal_dt.week_beg_dt between DATE '2013-05-01' and DATE '2013-08-01'
- group by test_cal_dt.week_beg_dt
+ and test_cal_dt.cal_dt between DATE '2013-05-01' and DATE '2013-08-01'
+ group by test_cal_dt.cal_dt
  having count(distinct seller_id) > 2
diff --git a/kylin-it/src/test/resources/query/sql_distinct_precisely/query03.sql b/kylin-it/src/test/resources/query/sql_distinct_precisely_rollup/query03.sql
similarity index 97%
copy from kylin-it/src/test/resources/query/sql_distinct_precisely/query03.sql
copy to kylin-it/src/test/resources/query/sql_distinct_precisely_rollup/query03.sql
index 2be8963..1bbea87 100644
--- a/kylin-it/src/test/resources/query/sql_distinct_precisely/query03.sql
+++ b/kylin-it/src/test/resources/query/sql_distinct_precisely_rollup/query03.sql
@@ -32,4 +32,4 @@ select test_cal_dt.week_beg_dt,sum(test_kylin_fact.price) as GMV
  on test_kylin_fact.slr_segment_cd = test_seller_type_dim.seller_type_cd
  where test_kylin_fact.lstg_format_name='FP-GTC'
  and test_cal_dt.week_beg_dt between DATE '2013-05-01' and DATE '2013-08-01'
- group by test_cal_dt.week_beg_dt
+ group by test_cal_dt.week_beg_dt
\ No newline at end of file
diff --git a/kylin-it/src/test/resources/query/sql_distinct_precisely/query03.sql b/kylin-it/src/test/resources/query/sql_distinct_precisely_rollup/query04.sql
similarity index 97%
copy from kylin-it/src/test/resources/query/sql_distinct_precisely/query03.sql
copy to kylin-it/src/test/resources/query/sql_distinct_precisely_rollup/query04.sql
index 2be8963..6f29947 100644
--- a/kylin-it/src/test/resources/query/sql_distinct_precisely/query03.sql
+++ b/kylin-it/src/test/resources/query/sql_distinct_precisely_rollup/query04.sql
@@ -33,3 +33,4 @@ select test_cal_dt.week_beg_dt,sum(test_kylin_fact.price) as GMV
  where test_kylin_fact.lstg_format_name='FP-GTC'
  and test_cal_dt.week_beg_dt between DATE '2013-05-01' and DATE '2013-08-01'
  group by test_cal_dt.week_beg_dt
+ having count(distinct seller_id) > 2
\ No newline at end of file