You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/01/12 09:28:10 UTC

[kylin] 14/17: KYLIN-5397 Support sum_lc function

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 60ad2aa86de528209b898c2d2bb343bcec1c3d7e
Author: Bowen Song <bo...@kyligence.io>
AuthorDate: Thu Nov 10 11:23:54 2022 +0800

    KYLIN-5397 Support sum_lc function
    
    * Support sum_lc function
    * Remove wrap sum_lc type
    * Remove rewrite logic, and unnecessary check
    * fix part ut
---
 .../kylin/rest/service/AccessServiceTest.java      |   9 +-
 .../kylin/rest/service/AclTCRServiceTest.java      |   4 +-
 .../java/org/apache/kylin/common/msg/Message.java  |   2 +-
 .../org/apache/kylin/common/util/DateFormat.java   |  29 ++-
 .../apache/kylin/common/util/DateFormatTest.java   |  51 ++++
 .../apache/kylin/measure/MeasureTypeFactory.java   |   2 +
 .../apache/kylin/measure/sumlc/SumLCCounter.java   |  96 ++++++++
 .../kylin/measure/sumlc/SumLCMeasureType.java      | 172 +++++++++++++
 .../apache/kylin/metadata/datatype/DataType.java   |  10 +-
 .../apache/kylin/metadata/model/FunctionDesc.java  |  68 ++++--
 .../metadata/model/util/FunctionDescTest.java      |  12 +-
 .../metadata/project/NProjectManagerTest.java      |   5 +-
 .../localmeta/metadata/_global/project/sum_lc.json |  35 +++
 .../f35f2937-9e4d-347a-7465-d64df939e7d6.json      |  13 +
 .../f35f2937-9e4d-347a-7465-d64df939e7d6.json      |  43 ++++
 .../f35f2937-9e4d-347a-7465-d64df939e7d6.json      | 267 +++++++++++++++++++++
 .../metadata/sum_lc/table/SSB.SUMLC_EXTEND_4X.json |  78 ++++++
 .../kylin/rest/service/ProjectServiceTest.java     |   8 +-
 .../kylin/query/relnode/KapAggregateRel.java       |  21 +-
 .../kylin/query/relnode/OLAPAggregateRel.java      |   2 +-
 .../rest/service/QueryHistoryServiceTest.java      |   2 +-
 .../kylin/query/engine/QueryRoutingEngine.java     |  15 ++
 .../kylin/query/engine/AsyncQueryJobTest.java      |   6 +-
 .../kylin/query/engine/QueryRoutingEngineTest.java |  16 ++
 .../kylin/engine/spark/job/CuboidAggregator.scala  |  41 ++--
 .../kylin/query/runtime/plan/AggregatePlan.scala   |  11 +-
 .../scala/org/apache/spark/sql/KapFunctions.scala  |  14 +-
 .../sql/catalyst/expressions/ExpressionUtils.scala |  31 ++-
 .../sql/catalyst/expressions/KapExpresssions.scala |  64 ++++-
 .../org/apache/spark/sql/udf/SparderAggFun.scala   |  11 +-
 .../apache/spark/sql/LayoutEntityConverter.scala   |   6 +-
 .../spark/sql/udaf/NullSafeValueSerializer.scala   |  30 ++-
 .../scala/org/apache/spark/sql/udaf/SumLC.scala    | 191 +++++++++++++++
 33 files changed, 1256 insertions(+), 109 deletions(-)

diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java
index fb102f63de..b0f9142756 100644
--- a/src/common-service/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java
+++ b/src/common-service/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java
@@ -48,6 +48,7 @@ import org.apache.kylin.common.persistence.AclEntity;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.user.ManagedUser;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.request.AccessRequest;
 import org.apache.kylin.rest.request.GlobalAccessRequest;
@@ -98,8 +99,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
-import org.apache.kylin.metadata.user.ManagedUser;
-
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ SpringContext.class, UserGroupInformation.class, KylinConfig.class, NProjectManager.class })
 public class AccessServiceTest extends NLocalFileMetadataTestCase {
@@ -597,14 +596,14 @@ public class AccessServiceTest extends NLocalFileMetadataTestCase {
     @Test
     public void testGetGrantedProjectsOfUser() throws IOException {
         List<String> result = accessService.getGrantedProjectsOfUser("ADMIN");
-        assertEquals(27, result.size());
+        assertEquals(28, result.size());
     }
 
     @Test
     public void testGetGrantedProjectsOfUserOrGroup() throws IOException {
         // admin user
         List<String> result = accessService.getGrantedProjectsOfUserOrGroup("ADMIN", true);
-        assertEquals(27, result.size());
+        assertEquals(28, result.size());
 
         // normal user
         result = accessService.getGrantedProjectsOfUserOrGroup("ANALYST", true);
@@ -786,7 +785,7 @@ public class AccessServiceTest extends NLocalFileMetadataTestCase {
     }
 
     @Test
-    public void testAclWithUnNaturalOrderUpdate() throws IOException{
+    public void testAclWithUnNaturalOrderUpdate() throws IOException {
         AclEntity ae = accessService.getAclEntity(AclEntityType.PROJECT_INSTANCE,
                 "1eaca32a-a33e-4b69-83dd-0bb8b1f8c91b");
 
diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/service/AclTCRServiceTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/service/AclTCRServiceTest.java
index bb994174a9..9a0ee62d7d 100644
--- a/src/common-service/src/test/java/org/apache/kylin/rest/service/AclTCRServiceTest.java
+++ b/src/common-service/src/test/java/org/apache/kylin/rest/service/AclTCRServiceTest.java
@@ -311,7 +311,7 @@ public class AclTCRServiceTest extends NLocalFileMetadataTestCase {
     }
 
     private SensitiveDataMask.MaskType getColumnDataMask(AclTCRRequest acl, String database, String table,
-                                                         String column) {
+            String column) {
         if (acl.getDatabaseName().equals(database)) {
             for (val tb : acl.getTables()) {
                 if (tb.getTableName().equals(table)) {
@@ -1316,7 +1316,7 @@ public class AclTCRServiceTest extends NLocalFileMetadataTestCase {
         Mockito.when(userService.isGlobalAdmin("ADMIN")).thenReturn(true);
         List<SidPermissionWithAclResponse> responses = accessService.getUserOrGroupAclPermissions(projects, "ADMIN",
                 true);
-        Assert.assertEquals(27, responses.size());
+        Assert.assertEquals(28, responses.size());
         Assert.assertTrue(responses.stream().allMatch(response -> "ADMIN".equals(response.getProjectPermission())));
 
         // test normal group
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
index 365474db28..2048ce9bc1 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
@@ -1015,7 +1015,7 @@ public class Message {
     }
 
     public String getInvalidTimeFormat() {
-        return "Can’t set the time partition column. The values of the selected column is not time formatted. Please select again.";
+        return "Can’t set the time partition column. The values of the selected column is not time formatted: {%s}. Please select again.";
     }
 
     public String getSegmentMergeStorageCheckError() {
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java b/src/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
index e5391e3b16..fd8dd243fa 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
@@ -64,6 +64,10 @@ public class DateFormat {
     public static final String DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS = "yyyy-MM-dd HH:mm:ss.SSS";
     public static final String DEFAULT_DATETIME_PATTERN_WITH_TIMEZONE = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
 
+    private static final int NANOS_TIMESTAMP_LENGTH = 16;
+    private static final int MILLIS_TIMESTAMP_LENGTH = 13;
+    private static final int SECONDS_TIMESTAMP_LENGTH = 10;
+
     static final private Map<String, FastDateFormat> formatMap = new ConcurrentHashMap<String, FastDateFormat>();
 
     private static final Map<String, String> dateFormatRegex = Maps.newHashMap();
@@ -210,13 +214,26 @@ public class DateFormat {
                 return stringToDate(str, regexToPattern.getValue()).getTime();
         }
 
-        // try parse it as days to epoch
         try {
-            long daysToEpoch = Long.parseLong(str);
-            return daysToEpoch * 24 * 60 * 60 * 1000;
+            long strToDigit = Long.parseLong(str);
+            if (strToDigit > 0) {
+                if (str.length() == NANOS_TIMESTAMP_LENGTH) {
+                    return strToDigit / 1000;
+                } else if (str.length() == MILLIS_TIMESTAMP_LENGTH) {
+                    return strToDigit;
+                } else if (str.length() == SECONDS_TIMESTAMP_LENGTH) {
+                    return strToDigit * 1000;
+                } else {
+                    // try parse it as days to epoch
+                    return strToDigit * 24 * 60 * 60 * 1000;
+                }
+            }
         } catch (NumberFormatException e) {
+            throw new KylinException(INVALID_TIME_PARTITION_COLUMN,
+                    String.format(Locale.ROOT, MsgPicker.getMsg().getInvalidTimeFormat(), str), e);
         }
-        throw new KylinException(INVALID_TIME_PARTITION_COLUMN, MsgPicker.getMsg().getInvalidTimeFormat());
+        throw new KylinException(INVALID_TIME_PARTITION_COLUMN,
+                String.format(Locale.ROOT, MsgPicker.getMsg().getInvalidTimeFormat(), str));
     }
 
     public static boolean isSupportedDateFormat(String dateStr) {
@@ -244,7 +261,8 @@ public class DateFormat {
             if (sampleData.matches(patternMap.getKey()))
                 return patternMap.getValue();
         }
-        throw new KylinException(INVALID_TIME_PARTITION_COLUMN, MsgPicker.getMsg().getInvalidTimeFormat());
+        throw new KylinException(INVALID_TIME_PARTITION_COLUMN,
+                String.format(Locale.ROOT, MsgPicker.getMsg().getInvalidTimeFormat(), sampleData));
     }
 
     /**
@@ -291,4 +309,5 @@ public class DateFormat {
                 || DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS.equals(format)
                 || DEFAULT_DATETIME_PATTERN_WITH_TIMEZONE.equals(format));
     }
+
 }
diff --git a/src/core-common/src/test/java/org/apache/kylin/common/util/DateFormatTest.java b/src/core-common/src/test/java/org/apache/kylin/common/util/DateFormatTest.java
index 9f5364e8cc..0387906235 100644
--- a/src/core-common/src/test/java/org/apache/kylin/common/util/DateFormatTest.java
+++ b/src/core-common/src/test/java/org/apache/kylin/common/util/DateFormatTest.java
@@ -24,12 +24,14 @@ import java.time.format.DateTimeParseException;
 import java.util.Arrays;
 import java.util.Date;
 
+import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.junit.annotation.MultiTimezoneTest;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 import org.junit.Assert;
+import org.junit.Test;
 
 /**
  * Created by dongli on 1/4/16.
@@ -189,4 +191,53 @@ public class DateFormatTest {
         }
 
     }
+
+    @Test
+    public void testStringToMillis() {
+        // 2022-12-01 00:00:00
+        long expectedMillis = 1669824000000L;
+
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022.12.01 00:00"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("202212"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022.12.01 00:00:00"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12-01 00:00:00:000"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("20221201"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022.12.01 00:00:00:000"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022.12.01"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022/12/01 00:00"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12-01 00:00:00.000"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022/12/01 00:00:00:000"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("20221201 00:00"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12-01 00:00"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("20221201 00:00:00:000"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("20221201 00:00:00"));
+    }
+
+    @Test
+    public void testStringToMillisSupplement() {
+        long expectedMillis = 1669824000000L;
+
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022/12/01 00:00:00"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("20221201T00:00:00.000Z"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022.12.01T00:00:00.000Z"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12-01"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12-01 00:00:00"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022/12/01"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("20221201 00:00:00.000"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12-01T00:00:00.000Z"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022/12/01T00:00:00.000Z"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022.12.01 00:00:00.000"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12-01T00:00:00.000+08:00"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022/12/01 00:00:00.000"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("1669824000000000"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("1669824000000"));
+        Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("1669824000"));
+    }
+
+    @Test
+    public void testUnsupportedStringToMillis() {
+        Assert.assertThrows(KylinException.class, () -> DateFormat.stringToMillis("12/01"));
+        Assert.assertThrows(KylinException.class, () -> DateFormat.stringToMillis("-12345"));
+    }
 }
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java b/src/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
index ff812fcabf..48344c876e 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
@@ -34,6 +34,7 @@ import org.apache.kylin.measure.extendedcolumn.ExtendedColumnMeasureType;
 import org.apache.kylin.measure.hllc.HLLCMeasureType;
 import org.apache.kylin.measure.percentile.PercentileMeasureType;
 import org.apache.kylin.measure.raw.RawMeasureType;
+import org.apache.kylin.measure.sumlc.SumLCMeasureType;
 import org.apache.kylin.measure.topn.TopNMeasureType;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.datatype.DataTypeSerializer;
@@ -120,6 +121,7 @@ abstract public class MeasureTypeFactory<T> {
         factoryInsts.add(new IntersectMeasureType.Factory());
         factoryInsts.add(new CollectSetMeasureType.Factory());
         factoryInsts.add(new CorrMeasureType.Factory());
+        factoryInsts.add(new SumLCMeasureType.Factory());
 
         logger.info("Checking custom measure types from kylin config");
 
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/measure/sumlc/SumLCCounter.java b/src/core-metadata/src/main/java/org/apache/kylin/measure/sumlc/SumLCCounter.java
new file mode 100644
index 0000000000..dfca82230b
--- /dev/null
+++ b/src/core-metadata/src/main/java/org/apache/kylin/measure/sumlc/SumLCCounter.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.sumlc;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import com.google.common.collect.Maps;
+
+public class SumLCCounter implements Serializable {
+    private static final Map<String, BiFunction<Number, Number, Number>> MERGE_FUNC_MAP = Maps.newHashMap();
+
+    static {
+        MERGE_FUNC_MAP.put(Long.class.getSimpleName(), (s1, s2) -> Long.sum((Long) s1, (Long) s2));
+        MERGE_FUNC_MAP.put(Double.class.getSimpleName(), (s1, s2) -> Double.sum((Double) s1, (Double) s2));
+        MERGE_FUNC_MAP.put(BigDecimal.class.getSimpleName(), (s1, s2) -> ((BigDecimal) s1).add((BigDecimal) s2));
+    }
+
+    Number sumLC;
+    Long timestamp;
+
+    public SumLCCounter() {
+
+    }
+
+    public SumLCCounter(Number sumLC, Long timestamp) {
+        this.sumLC = numericTypeConversion(sumLC);
+        this.timestamp = timestamp;
+    }
+
+    public static SumLCCounter merge(SumLCCounter current, Number sumLC, Long timestamp) {
+        SumLCCounter merged = new SumLCCounter(sumLC, timestamp);
+        return merge(current, merged);
+    }
+
+    public static SumLCCounter merge(SumLCCounter value1, SumLCCounter value2) {
+        if (value1 == null || value1.timestamp == null)
+            return value2;
+        if (value2 == null || value2.timestamp == null)
+            return value1;
+        if (value2.timestamp > value1.timestamp) {
+            return value2;
+        } else if (value1.timestamp > value2.timestamp) {
+            return value1;
+        } else {
+            return mergeSum(value1, value2);
+        }
+    }
+
+    private static SumLCCounter mergeSum(SumLCCounter cnt1, SumLCCounter cnt2) {
+        if (cnt1.sumLC == null)
+            return cnt2;
+        if (cnt2.sumLC == null)
+            return cnt1;
+        String sumLCTypeName = cnt1.sumLC.getClass().getSimpleName();
+        Number semiSum = MERGE_FUNC_MAP.get(sumLCTypeName).apply(cnt1.sumLC, cnt2.sumLC);
+        return new SumLCCounter(semiSum, cnt1.timestamp);
+    }
+
+    private static Number numericTypeConversion(Number input) {
+        if (input instanceof Byte || input instanceof Short || input instanceof Integer) {
+            return input.longValue();
+        } else if (input instanceof Float) {
+            return input.doubleValue();
+        } else {
+            return input;
+        }
+    }
+
+    public Number getSumLC() {
+        return sumLC;
+    }
+
+    public Long getTimestamp() {
+        return timestamp;
+    }
+
+}
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/measure/sumlc/SumLCMeasureType.java b/src/core-metadata/src/main/java/org/apache/kylin/measure/sumlc/SumLCMeasureType.java
new file mode 100644
index 0000000000..c94c29e54d
--- /dev/null
+++ b/src/core-metadata/src/main/java/org/apache/kylin/measure/sumlc/SumLCMeasureType.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.sumlc;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.MeasureTypeFactory;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.ImmutableMap;
+
+public class SumLCMeasureType extends MeasureType<SumLCCounter> {
+    public static final String FUNC_SUM_LC = "SUM_LC";
+    public static final String DATATYPE_SUM_LC = "sum_lc";
+    static final Map<String, Class<?>> UDAF_MAP = ImmutableMap.of(SumLCMeasureType.FUNC_SUM_LC, SumLCAggFunc.class);
+
+    public SumLCMeasureType(String funcName, DataType dataType) {
+
+    }
+
+    @Override
+    public MeasureIngester<SumLCCounter> newIngester() {
+        return new MeasureIngester<SumLCCounter>() {
+            @Override
+            public SumLCCounter valueOf(String[] values, MeasureDesc measureDesc,
+                    Map<TblColRef, Dictionary<String>> dictionaryMap) {
+                return null;
+            }
+        };
+    }
+
+    @Override
+    public MeasureAggregator<SumLCCounter> newAggregator() {
+        return new MeasureAggregator<SumLCCounter>() {
+            @Override
+            public void reset() {
+                // left over issue, default implementation ignored
+            }
+
+            @Override
+            public void aggregate(SumLCCounter value) {
+                // left over issue, default implementation ignored
+            }
+
+            @Override
+            public SumLCCounter aggregate(SumLCCounter value1, SumLCCounter value2) {
+                return null;
+            }
+
+            @Override
+            public SumLCCounter getState() {
+                return null;
+            }
+
+            @Override
+            public int getMemBytesEstimate() {
+                return 0;
+            }
+        };
+    }
+
+    @Override
+    public boolean needRewrite() {
+        return true;
+    }
+
+    @Override
+    public Map<String, Class<?>> getRewriteCalciteAggrFunctions() {
+        return UDAF_MAP;
+    }
+
+    public static class Factory extends MeasureTypeFactory<SumLCCounter> {
+
+        @Override
+        public MeasureType<SumLCCounter> createMeasureType(String funcName, DataType dataType) {
+            return new SumLCMeasureType(funcName, dataType);
+        }
+
+        @Override
+        public String getAggrFunctionName() {
+            return FUNC_SUM_LC;
+        }
+
+        @Override
+        public String getAggrDataTypeName() {
+            return DATATYPE_SUM_LC;
+        }
+
+        @Override
+        public Class<? extends DataTypeSerializer<SumLCCounter>> getAggrDataTypeSerializer() {
+            return SumLCSerializer.class;
+        }
+    }
+
+    /**
+     * This class is used for registering sum_lc to calcite schema, no need to implement the functions
+     */
+    public static class SumLCAggFunc {
+
+        public static SumLCCounter init() {
+            return null;
+        }
+
+        public static SumLCCounter add(SumLCCounter cur, Object v, Object r) {
+            return null;
+        }
+
+        public static SumLCCounter merge(SumLCCounter counter0, SumLCCounter counter1) {
+            return null;
+        }
+
+        public static Object result(SumLCCounter counter) {
+            return null;
+        }
+    }
+
+    public static class SumLCSerializer extends DataTypeSerializer<SumLCCounter> {
+
+        public SumLCSerializer(DataType dataType) {
+
+        }
+
+        @Override
+        public void serialize(SumLCCounter value, ByteBuffer out) {
+            // left over issue, default implementation ignored
+        }
+
+        @Override
+        public SumLCCounter deserialize(ByteBuffer in) {
+            return null;
+        }
+
+        @Override
+        public int peekLength(ByteBuffer in) {
+            return 0;
+        }
+
+        @Override
+        public int maxLength() {
+            return 0;
+        }
+
+        @Override
+        public int getStorageBytesEstimate() {
+            return 0;
+        }
+    }
+}
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
index f9ef1bc103..ce7081c1bc 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
@@ -171,9 +171,9 @@ public class DataType implements Serializable {
     }
 
     public static DataType getType(String type) {
-        if (type == null)
+        if (type == null) {
             return null;
-
+        }
         DataType dataType = new DataType(type);
         DataType cached = CACHE.get(dataType);
         if (cached == null) {
@@ -409,10 +409,13 @@ public class DataType implements Serializable {
     public static final BytesSerializer<DataType> serializer = new BytesSerializer<DataType>() {
         @Override
         public void serialize(DataType value, ByteBuffer out) {
+            serializeDataType(value, out);
+        }
+
+        private void serializeDataType(DataType value, ByteBuffer out) {
             BytesUtil.writeUTFString(value.name, out);
             BytesUtil.writeVInt(value.precision, out);
             BytesUtil.writeVInt(value.scale, out);
-
         }
 
         @Override
@@ -420,7 +423,6 @@ public class DataType implements Serializable {
             String name = BytesUtil.readUTFString(in);
             int precision = BytesUtil.readVInt(in);
             int scale = BytesUtil.readVInt(in);
-
             return new DataType(name, precision, scale);
         }
     };
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index c66ed0ce8e..b2a212f84f 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -43,6 +43,7 @@ import java.util.stream.Collectors;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.directory.api.util.Strings;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.MeasureTypeFactory;
@@ -55,6 +56,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -91,21 +93,31 @@ public class FunctionDesc implements Serializable {
 
     public static String proposeReturnType(String expression, String colDataType, Map<String, String> override,
             boolean saveCheck) {
-        String returnType = override.getOrDefault(expression,
-                EXPRESSION_DEFAULT_TYPE_MAP.getOrDefault(expression, colDataType));
-        if (saveCheck && colDataType != null && DataType.getType(colDataType).isStringFamily()) {
+        if (saveCheck) {
             switch (expression) {
             case FunctionDesc.FUNC_SUM:
-            case FunctionDesc.FUNC_PERCENTILE:
-                throw new KylinException(INVALID_MEASURE_DATA_TYPE,
-                        String.format(Locale.ROOT, "Invalid column type %s for measure %s", colDataType, expression));
+            case FunctionDesc.FUNC_PERCENTILE: {
+                if (colDataType != null && DataType.getType(colDataType).isStringFamily()) {
+                    throw new KylinException(INVALID_MEASURE_DATA_TYPE, String.format(Locale.ROOT,
+                            "Invalid column type %s for measure %s", colDataType, expression));
+                }
+                break;
+            }
+            case FunctionDesc.FUNC_SUM_LC: {
+                Preconditions.checkArgument(Strings.isNotEmpty(colDataType),
+                        "SUM_LC Measure's input type shouldn't be null or empty");
+                checkSumLCDataType(colDataType);
+                break;
+            }
             default:
                 break;
             }
         }
 
-        switch (expression) {
-        case FunctionDesc.FUNC_SUM:
+        String returnType = override.getOrDefault(expression,
+                EXPRESSION_DEFAULT_TYPE_MAP.getOrDefault(expression, colDataType));
+        // widen return type for sum or sum_lc measure
+        if (FunctionDesc.FUNC_SUM.equals(expression) || FunctionDesc.FUNC_SUM_LC.equals(expression)) {
             if (colDataType != null) {
                 DataType type = DataType.getType(returnType);
                 if (type.isIntegerFamily()) {
@@ -119,13 +131,19 @@ public class FunctionDesc implements Serializable {
             } else {
                 returnType = "decimal(19,4)";
             }
-            break;
-        default:
-            break;
         }
         return returnType;
     }
 
+    private static void checkSumLCDataType(String dataTypeName) {
+        DataType dataType = DataType.getType(dataTypeName);
+        if (!dataType.isNumberFamily()) {
+            throw new KylinException(INVALID_MEASURE_DATA_TYPE,
+                    String.format(Locale.ROOT, "SUM_LC Measure's return type '%s' is illegal. It must be one of %s",
+                            dataType, DataType.NUMBER_FAMILY));
+        }
+    }
+
     public static final String FUNC_SUM = "SUM";
     public static final String FUNC_MIN = "MIN";
     public static final String FUNC_MAX = "MAX";
@@ -147,6 +165,7 @@ public class FunctionDesc implements Serializable {
     public static final String FUNC_PERCENTILE = "PERCENTILE_APPROX";
     public static final String FUNC_GROUPING = "GROUPING";
     public static final String FUNC_TOP_N = "TOP_N";
+    public static final String FUNC_SUM_LC = "SUM_LC";
     public static final ImmutableSet<String> DIMENSION_AS_MEASURES = ImmutableSet.<String> builder()
             .add(FUNC_MAX, FUNC_MIN, FUNC_COUNT_DISTINCT).build();
     public static final ImmutableSet<String> NOT_SUPPORTED_FUNCTION = ImmutableSet.<String> builder().build();
@@ -195,19 +214,30 @@ public class FunctionDesc implements Serializable {
         for (ParameterDesc p : getParameters()) {
             if (p.isColumnType()) {
                 TblColRef colRef = model.findColumn(p.getValue());
-                returnDataType = DataType.getType(
-                        proposeReturnType(expression, colRef.getDatatype(), Maps.newHashMap(), model.isSaveCheck()));
                 p.setValue(colRef.getIdentity());
                 p.setColRef(colRef);
+                if (expression.equals(FUNC_SUM_LC)) {
+                    if (Objects.isNull(returnDataType)) {
+                        // use the first column to init returnType and returnDataType, ignore the second timestamp column
+                        returnType = proposeReturnType(expression, colRef.getDatatype(), Maps.newHashMap(),
+                                model.isSaveCheck());
+                        returnDataType = DataType.getType(returnType);
+                    }
+                } else {
+                    returnDataType = DataType.getType(proposeReturnType(expression, colRef.getDatatype(),
+                            Maps.newHashMap(), model.isSaveCheck()));
+                }
             }
         }
-        if (returnDataType == null) {
-            returnDataType = DataType.getType(BIGINT);
-        }
-        if (!StringUtils.isEmpty(returnType)) {
-            returnDataType = DataType.getType(returnType);
+        if (!expression.equals(FUNC_SUM_LC)) {
+            if (returnDataType == null) {
+                returnDataType = DataType.getType(BIGINT);
+            }
+            if (!StringUtils.isEmpty(returnType)) {
+                returnDataType = DataType.getType(returnType);
+            }
+            returnType = returnDataType.toString();
         }
-        returnType = returnDataType.toString();
     }
 
     private void reInitMeasureType() {
diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/util/FunctionDescTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/util/FunctionDescTest.java
index 226b8ac3b7..77ed4f6ecd 100644
--- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/util/FunctionDescTest.java
+++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/util/FunctionDescTest.java
@@ -26,14 +26,14 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.metadata.model.NDataModelManager;
 import org.apache.kylin.metadata.model.ParameterDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
-import org.apache.kylin.metadata.model.NDataModel;
-import org.apache.kylin.metadata.model.NDataModelManager;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -86,6 +86,12 @@ public class FunctionDescTest extends NLocalFileMetadataTestCase {
             Assert.fail();
         } catch (KylinException ignored) {
         }
+        Assert.assertThrows(IllegalArgumentException.class,
+                () -> FunctionDesc.proposeReturnType("SUM_LC", "", Maps.newHashMap(), true));
+        Assert.assertThrows(KylinException.class,
+                () -> FunctionDesc.proposeReturnType("SUM_LC", "char", Maps.newHashMap(), true));
+        String returnType = FunctionDesc.proposeReturnType("SUM_LC", "bigint", Maps.newHashMap(), true);
+        Assert.assertEquals("bigint", returnType);
     }
 
     @Test
diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/project/NProjectManagerTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/project/NProjectManagerTest.java
index d87805c140..0d2e6cbaf3 100644
--- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/project/NProjectManagerTest.java
+++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/project/NProjectManagerTest.java
@@ -26,10 +26,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.exception.KylinException;
-import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.hystrix.NCircuitBreaker;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
-import org.apache.kylin.metadata.project.ProjectInstance;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -69,7 +68,7 @@ public class NProjectManagerTest extends NLocalFileMetadataTestCase {
         }
 
         val projects = projectManager.listAllProjects();
-        Assert.assertEquals(27, projects.size());
+        Assert.assertEquals(28, projects.size());
         Assert.assertTrue(projects.stream().noneMatch(p -> p.getName().equals("test")));
     }
 
diff --git a/src/examples/test_case_data/localmeta/metadata/_global/project/sum_lc.json b/src/examples/test_case_data/localmeta/metadata/_global/project/sum_lc.json
new file mode 100644
index 0000000000..2fdd47af4f
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/_global/project/sum_lc.json
@@ -0,0 +1,35 @@
+{
+  "uuid" : "d1ceb3a4-4d2c-27af-5b7d-92c8a4055776",
+  "last_modified" : 1667464991353,
+  "create_time" : 1667464991347,
+  "version" : "4.0.0.0",
+  "name" : "sum_lc",
+  "owner" : "ADMIN",
+  "status" : "ENABLED",
+  "create_time_utc" : 1667464991347,
+  "default_database" : "DEFAULT",
+  "description" : "",
+  "principal" : null,
+  "keytab" : null,
+  "maintain_model_type" : "MANUAL_MAINTAIN",
+  "override_kylin_properties" : {
+    "kylin.metadata.semi-automatic-mode" : "false",
+    "kylin.query.metadata.expose-computed-column" : "true",
+    "kylin.source.default" : "9"
+  },
+  "segment_config" : {
+    "auto_merge_enabled" : false,
+    "auto_merge_time_ranges" : [ "WEEK", "MONTH", "QUARTER", "YEAR" ],
+    "volatile_range" : {
+      "volatile_range_number" : 0,
+      "volatile_range_enabled" : false,
+      "volatile_range_type" : "DAY"
+    },
+    "retention_range" : {
+      "retention_range_number" : 1,
+      "retention_range_enabled" : false,
+      "retention_range_type" : "MONTH"
+    },
+    "create_empty_segment_enabled" : false
+  }
+}
\ No newline at end of file
diff --git a/src/examples/test_case_data/localmeta/metadata/sum_lc/dataflow/f35f2937-9e4d-347a-7465-d64df939e7d6.json b/src/examples/test_case_data/localmeta/metadata/sum_lc/dataflow/f35f2937-9e4d-347a-7465-d64df939e7d6.json
new file mode 100644
index 0000000000..f5f39f4889
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/sum_lc/dataflow/f35f2937-9e4d-347a-7465-d64df939e7d6.json
@@ -0,0 +1,13 @@
+{
+  "uuid" : "f35f2937-9e4d-347a-7465-d64df939e7d6",
+  "last_modified" : 1667467391143,
+  "create_time" : 1667465241650,
+  "version" : "4.0.0.0",
+  "status" : "ONLINE",
+  "last_status" : null,
+  "cost" : 50,
+  "query_hit_count" : 0,
+  "last_query_time" : 0,
+  "layout_query_hit_count" : { },
+  "segments" : [ ]
+}
\ No newline at end of file
diff --git a/src/examples/test_case_data/localmeta/metadata/sum_lc/index_plan/f35f2937-9e4d-347a-7465-d64df939e7d6.json b/src/examples/test_case_data/localmeta/metadata/sum_lc/index_plan/f35f2937-9e4d-347a-7465-d64df939e7d6.json
new file mode 100644
index 0000000000..9b29b1fd06
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/sum_lc/index_plan/f35f2937-9e4d-347a-7465-d64df939e7d6.json
@@ -0,0 +1,43 @@
+{
+  "uuid" : "f35f2937-9e4d-347a-7465-d64df939e7d6",
+  "last_modified" : 1667467391117,
+  "create_time" : 1667293061109,
+  "version" : "4.0.0.0",
+  "description" : null,
+  "rule_based_index" : {
+    "dimensions" : [ 1, 6 ],
+    "measures" : [ 100000, 100001, 100002, 100003, 100004, 100005, 100006, 100007, 100008 ],
+    "global_dim_cap" : null,
+    "aggregation_groups" : [ {
+      "includes" : [ 1, 6 ],
+      "measures" : [ 100000, 100001, 100002, 100003, 100004, 100005, 100006, 100007, 100008 ],
+      "select_rule" : {
+        "hierarchy_dims" : [ ],
+        "mandatory_dims" : [ ],
+        "joint_dims" : [ ]
+      },
+      "index_range" : "EMPTY"
+    } ],
+    "layout_id_mapping" : [ 30001, 40001, 50001 ],
+    "parent_forward" : 3,
+    "index_start_id" : 30000,
+    "last_modify_time" : 1667370467484,
+    "layout_black_list" : [ ],
+    "scheduler_version" : 2,
+    "index_update_enabled" : true,
+    "base_layout_enabled" : true
+  },
+  "indexes" : [ ],
+  "override_properties" : { },
+  "to_be_deleted_indexes" : [ ],
+  "auto_merge_time_ranges" : null,
+  "retention_range" : 0,
+  "engine_type" : 80,
+  "next_aggregation_index_id" : 60000,
+  "next_table_index_id" : 20000000000,
+  "agg_shard_by_columns" : [ ],
+  "extend_partition_columns" : [ ],
+  "layout_bucket_num" : { },
+  "approved_additional_recs" : 0,
+  "approved_removal_recs" : 0
+}
\ No newline at end of file
diff --git a/src/examples/test_case_data/localmeta/metadata/sum_lc/model_desc/f35f2937-9e4d-347a-7465-d64df939e7d6.json b/src/examples/test_case_data/localmeta/metadata/sum_lc/model_desc/f35f2937-9e4d-347a-7465-d64df939e7d6.json
new file mode 100644
index 0000000000..ddc2b999de
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/sum_lc/model_desc/f35f2937-9e4d-347a-7465-d64df939e7d6.json
@@ -0,0 +1,267 @@
+{
+  "uuid" : "f35f2937-9e4d-347a-7465-d64df939e7d6",
+  "last_modified" : 1667465241644,
+  "create_time" : 1667293060578,
+  "version" : "4.0.0.0",
+  "alias" : "sum_lc_multi_data_type_test",
+  "owner" : "ADMIN",
+  "config_last_modifier" : null,
+  "config_last_modified" : 0,
+  "description" : null,
+  "fact_table" : "SSB.SUMLC_EXTEND_4X",
+  "fact_table_alias" : null,
+  "management_type" : "MODEL_BASED",
+  "join_tables" : [ ],
+  "filter_condition" : "",
+  "partition_desc" : {
+    "partition_date_column" : "SUMLC_EXTEND_4X.TX_DATE",
+    "partition_date_start" : 0,
+    "partition_date_format" : "yyyy-MM-dd",
+    "partition_type" : "APPEND",
+    "partition_condition_builder" : "org.apache.kylin.metadata.model.PartitionDesc$DefaultPartitionConditionBuilder"
+  },
+  "capacity" : "MEDIUM",
+  "segment_config" : {
+    "auto_merge_enabled" : null,
+    "auto_merge_time_ranges" : null,
+    "volatile_range" : null,
+    "retention_range" : null,
+    "create_empty_segment_enabled" : false
+  },
+  "data_check_desc" : null,
+  "semantic_version" : 0,
+  "storage_type" : 0,
+  "model_type" : "BATCH",
+  "all_named_columns" : [ {
+    "id" : 0,
+    "name" : "INT_DATA",
+    "column" : "SUMLC_EXTEND_4X.INT_DATA"
+  }, {
+    "id" : 1,
+    "name" : "ACCOUNT",
+    "column" : "SUMLC_EXTEND_4X.ACCOUNT",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 2,
+    "name" : "TINYINT_DATA",
+    "column" : "SUMLC_EXTEND_4X.TINYINT_DATA"
+  }, {
+    "id" : 3,
+    "name" : "DOUBLE_DATA",
+    "column" : "SUMLC_EXTEND_4X.DOUBLE_DATA"
+  }, {
+    "id" : 4,
+    "name" : "SMALLINT_DATA",
+    "column" : "SUMLC_EXTEND_4X.SMALLINT_DATA"
+  }, {
+    "id" : 5,
+    "name" : "DECIMAL_DATA",
+    "column" : "SUMLC_EXTEND_4X.DECIMAL_DATA"
+  }, {
+    "id" : 6,
+    "name" : "TX_DATE",
+    "column" : "SUMLC_EXTEND_4X.TX_DATE",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 7,
+    "name" : "MILLIS",
+    "column" : "SUMLC_EXTEND_4X.MILLIS"
+  }, {
+    "id" : 8,
+    "name" : "FLOAT_DATA",
+    "column" : "SUMLC_EXTEND_4X.FLOAT_DATA"
+  }, {
+    "id" : 9,
+    "name" : "BIGINT_DATA",
+    "column" : "SUMLC_EXTEND_4X.BIGINT_DATA"
+  }, {
+    "id" : 10,
+    "name" : "TINYINT_CC",
+    "column" : "SUMLC_EXTEND_4X.TINYINT_CC"
+  } ],
+  "all_measures" : [ {
+    "name" : "COUNT_ALL",
+    "function" : {
+      "expression" : "COUNT",
+      "parameters" : [ {
+        "type" : "constant",
+        "value" : "1"
+      } ],
+      "returntype" : "bigint"
+    },
+    "column" : null,
+    "comment" : null,
+    "id" : 100000,
+    "type" : "NORMAL",
+    "internal_ids" : [ ]
+  }, {
+    "name" : "sumlc_decimal_tx_date",
+    "function" : {
+      "expression" : "SUM_LC",
+      "parameters" : [ {
+        "type" : "column",
+        "value" : "SUMLC_EXTEND_4X.DECIMAL_DATA"
+      }, {
+        "type" : "column",
+        "value" : "SUMLC_EXTEND_4X.TX_DATE"
+      } ],
+      "returntype" : "decimal(19,7)"
+    },
+    "column" : null,
+    "comment" : "",
+    "id" : 100001,
+    "type" : "NORMAL",
+    "internal_ids" : [ ]
+  }, {
+    "name" : "sumlc_tinyint_tx_date",
+    "function" : {
+      "expression" : "SUM_LC",
+      "parameters" : [ {
+        "type" : "column",
+        "value" : "SUMLC_EXTEND_4X.TINYINT_DATA"
+      }, {
+        "type" : "column",
+        "value" : "SUMLC_EXTEND_4X.TX_DATE"
+      } ],
+      "returntype" : "bigint"
+    },
+    "column" : null,
+    "comment" : "",
+    "id" : 100002,
+    "type" : "NORMAL",
+    "internal_ids" : [ ]
+  }, {
+    "name" : "sumlc_smallint_tx_date",
+    "function" : {
+      "expression" : "SUM_LC",
+      "parameters" : [ {
+        "type" : "column",
+        "value" : "SUMLC_EXTEND_4X.SMALLINT_DATA"
+      }, {
+        "type" : "column",
+        "value" : "SUMLC_EXTEND_4X.TX_DATE"
+      } ],
+      "returntype" : "bigint"
+    },
+    "column" : null,
+    "comment" : "",
+    "id" : 100003,
+    "type" : "NORMAL",
+    "internal_ids" : [ ]
+  }, {
+    "name" : "sumlc_int_tx_date",
+    "function" : {
+      "expression" : "SUM_LC",
+      "parameters" : [ {
+        "type" : "column",
+        "value" : "SUMLC_EXTEND_4X.INT_DATA"
+      }, {
+        "type" : "column",
+        "value" : "SUMLC_EXTEND_4X.TX_DATE"
+      } ],
+      "returntype" : "bigint"
+    },
+    "column" : null,
+    "comment" : "",
+    "id" : 100004,
+    "type" : "NORMAL",
+    "internal_ids" : [ ]
+  }, {
+    "name" : "sumlc_bigint_tx_date",
+    "function" : {
+      "expression" : "SUM_LC",
+      "parameters" : [ {
+        "type" : "column",
+        "value" : "SUMLC_EXTEND_4X.BIGINT_DATA"
+      }, {
+        "type" : "column",
+        "value" : "SUMLC_EXTEND_4X.TX_DATE"
+      } ],
+      "returntype" : "bigint"
+    },
+    "column" : null,
+    "comment" : "",
+    "id" : 100005,
+    "type" : "NORMAL",
+    "internal_ids" : [ ]
+  }, {
+    "name" : "sumlc_float_tx_date",
+    "function" : {
+      "expression" : "SUM_LC",
+      "parameters" : [ {
+        "type" : "column",
+        "value" : "SUMLC_EXTEND_4X.FLOAT_DATA"
+      }, {
+        "type" : "column",
+        "value" : "SUMLC_EXTEND_4X.TX_DATE"
+      } ],
+      "returntype" : "double"
+    },
+    "column" : null,
+    "comment" : "",
+    "id" : 100006,
+    "type" : "NORMAL",
+    "internal_ids" : [ ]
+  }, {
+    "name" : "sumlc_double_tx_date",
+    "function" : {
+      "expression" : "SUM_LC",
+      "parameters" : [ {
+        "type" : "column",
+        "value" : "SUMLC_EXTEND_4X.DOUBLE_DATA"
+      }, {
+        "type" : "column",
+        "value" : "SUMLC_EXTEND_4X.TX_DATE"
+      } ],
+      "returntype" : "double"
+    },
+    "column" : null,
+    "comment" : "",
+    "id" : 100007,
+    "type" : "NORMAL",
+    "internal_ids" : [ ]
+  }, {
+    "name" : "sumlc_tinyint_cc_tx_date",
+    "function" : {
+      "expression" : "SUM_LC",
+      "parameters" : [ {
+        "type" : "column",
+        "value" : "SUMLC_EXTEND_4X.TINYINT_CC"
+      }, {
+        "type" : "column",
+        "value" : "SUMLC_EXTEND_4X.TX_DATE"
+      } ],
+      "returntype" : "bigint"
+    },
+    "column" : null,
+    "comment" : "",
+    "id" : 100008,
+    "type" : "NORMAL",
+    "internal_ids" : [ ]
+  } ],
+  "recommendations_count" : 0,
+  "computed_columns" : [ {
+    "tableIdentity" : "SSB.SUMLC_EXTEND_4X",
+    "tableAlias" : "SUMLC_EXTEND_4X",
+    "columnName" : "TINYINT_CC",
+    "expression" : "SUMLC_EXTEND_4X.TINYINT_DATA * 2",
+    "innerExpression" : "`SUMLC_EXTEND_4X`.`TINYINT_DATA` * 2",
+    "datatype" : "INTEGER",
+    "comment" : null,
+    "rec_uuid" : null
+  } ],
+  "canvas" : {
+    "coordinate" : {
+      "SUMLC_EXTEND_4X" : {
+        "x" : 307.9999966091586,
+        "y" : 27.444441053602443,
+        "width" : 200.0,
+        "height" : 522.2222222222221
+      }
+    },
+    "zoom" : 9.0
+  },
+  "multi_partition_desc" : null,
+  "multi_partition_key_mapping" : null,
+  "fusion_id" : null
+}
\ No newline at end of file
diff --git a/src/examples/test_case_data/localmeta/metadata/sum_lc/table/SSB.SUMLC_EXTEND_4X.json b/src/examples/test_case_data/localmeta/metadata/sum_lc/table/SSB.SUMLC_EXTEND_4X.json
new file mode 100644
index 0000000000..85880c9a43
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/sum_lc/table/SSB.SUMLC_EXTEND_4X.json
@@ -0,0 +1,78 @@
+{
+  "uuid" : "5c5334e5-44ec-d8a3-0357-a63d84b56b06",
+  "last_modified" : 0,
+  "create_time" : 1667465028069,
+  "version" : "4.0.0.0",
+  "name" : "SUMLC_EXTEND_4X",
+  "columns" : [ {
+    "id" : "1",
+    "name" : "TX_DATE",
+    "datatype" : "date",
+    "case_sensitive_name" : "tx_date"
+  }, {
+    "id" : "2",
+    "name" : "MILLIS",
+    "datatype" : "bigint",
+    "case_sensitive_name" : "millis"
+  }, {
+    "id" : "3",
+    "name" : "ACCOUNT",
+    "datatype" : "varchar(4096)",
+    "case_sensitive_name" : "account"
+  }, {
+    "id" : "4",
+    "name" : "TINYINT_DATA",
+    "datatype" : "tinyint",
+    "case_sensitive_name" : "tinyint_data"
+  }, {
+    "id" : "5",
+    "name" : "SMALLINT_DATA",
+    "datatype" : "smallint",
+    "case_sensitive_name" : "smallint_data"
+  }, {
+    "id" : "6",
+    "name" : "INT_DATA",
+    "datatype" : "integer",
+    "case_sensitive_name" : "int_data"
+  }, {
+    "id" : "7",
+    "name" : "BIGINT_DATA",
+    "datatype" : "bigint",
+    "case_sensitive_name" : "bigint_data"
+  }, {
+    "id" : "8",
+    "name" : "FLOAT_DATA",
+    "datatype" : "double",
+    "case_sensitive_name" : "float_data"
+  }, {
+    "id" : "9",
+    "name" : "DOUBLE_DATA",
+    "datatype" : "double",
+    "case_sensitive_name" : "double_data"
+  }, {
+    "id" : "10",
+    "name" : "DECIMAL_DATA",
+    "datatype" : "decimal(9,7)",
+    "case_sensitive_name" : "decimal_data"
+  } ],
+  "source_type" : 9,
+  "table_type" : "MANAGED",
+  "top" : false,
+  "increment_loading" : false,
+  "last_snapshot_path" : null,
+  "last_snapshot_size" : 0,
+  "snapshot_last_modified" : 0,
+  "query_hit_count" : 0,
+  "partition_column" : null,
+  "snapshot_partitions" : { },
+  "snapshot_partitions_info" : { },
+  "snapshot_total_rows" : 0,
+  "snapshot_partition_col" : null,
+  "selected_snapshot_partition_col" : null,
+  "temp_snapshot_path" : null,
+  "snapshot_has_broken" : false,
+  "database" : "SSB",
+  "transactional" : false,
+  "rangePartition" : false,
+  "partition_desc" : null
+}
\ No newline at end of file
diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
index 61720f727c..34160d76c6 100644
--- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
+++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
@@ -56,6 +56,7 @@ import org.apache.kylin.metadata.model.NDataModelManager;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
 import org.apache.kylin.query.pushdown.PushDownRunnerSparkImpl;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.request.GarbageCleanUpConfigRequest;
@@ -97,7 +98,6 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import io.kyligence.kap.clickhouse.MockSecondStorage;
-import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
 import lombok.val;
 import lombok.var;
 import lombok.extern.slf4j.Slf4j;
@@ -235,14 +235,14 @@ public class ProjectServiceTest extends NLocalFileMetadataTestCase {
     public void testGetReadableProjects() {
         Mockito.doReturn(true).when(aclEvaluate).hasProjectAdminPermission(Mockito.any(ProjectInstance.class));
         List<ProjectInstance> projectInstances = projectService.getReadableProjects("", false);
-        Assert.assertEquals(27, projectInstances.size());
+        Assert.assertEquals(28, projectInstances.size());
     }
 
     @Test
     public void testGetAdminProjects() throws Exception {
         Mockito.doReturn(true).when(aclEvaluate).hasProjectAdminPermission(Mockito.any(ProjectInstance.class));
         List<ProjectInstance> projectInstances = projectService.getAdminProjects();
-        Assert.assertEquals(27, projectInstances.size());
+        Assert.assertEquals(28, projectInstances.size());
     }
 
     @Test
@@ -256,7 +256,7 @@ public class ProjectServiceTest extends NLocalFileMetadataTestCase {
     public void testGetReadableProjectsHasNoPermissionProject() {
         Mockito.doReturn(true).when(aclEvaluate).hasProjectAdminPermission(Mockito.any(ProjectInstance.class));
         List<ProjectInstance> projectInstances = projectService.getReadableProjects("", false);
-        Assert.assertEquals(27, projectInstances.size());
+        Assert.assertEquals(28, projectInstances.size());
 
     }
 
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapAggregateRel.java b/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapAggregateRel.java
index bb004b174d..73c50d3d64 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapAggregateRel.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapAggregateRel.java
@@ -46,13 +46,13 @@ import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.measure.corr.CorrMeasureType;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate;
 import org.apache.kylin.metadata.cube.model.NDataflow;
+import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MultiPartitionDesc;
 import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.query.util.ICutContextStrategy;
 
 import com.google.common.collect.ImmutableList;
@@ -65,7 +65,7 @@ import com.google.common.collect.Sets;
 public class KapAggregateRel extends OLAPAggregateRel implements KapRel {
 
     protected static final List<String> supportedFunction = Lists.newArrayList("SUM", "MIN", "MAX", "COUNT_DISTINCT",
-            "BITMAP_UUID", "PERCENTILE_APPROX", FunctionDesc.FUNC_BITMAP_BUILD);
+            "BITMAP_UUID", "PERCENTILE_APPROX", FunctionDesc.FUNC_BITMAP_BUILD, FunctionDesc.FUNC_SUM_LC);
     private ImmutableList<Integer> rewriteGroupKeys; // preserve the ordering of group keys after CC replacement
     private List<ImmutableBitSet> rewriteGroupSets; // group sets with cc replaced
     List<AggregateCall> aggregateCalls;
@@ -203,8 +203,8 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel {
             TblColRef originalColumn = inputColumnRowType.getColumnByIndex(i);
             if (null != this.context && this.context.getGroupCCColRewriteMapping().containsKey(originalColumn)) {
                 groups.add(this.context.getGroupCCColRewriteMapping().get(originalColumn));
-                groupKeys
-                        .add(inputColumnRowType.getIndexByName(this.context.getGroupCCColRewriteMapping().get(originalColumn).getName()));
+                groupKeys.add(inputColumnRowType
+                        .getIndexByName(this.context.getGroupCCColRewriteMapping().get(originalColumn).getName()));
             } else {
                 Set<TblColRef> sourceColumns = inputColumnRowType.getSourceColumnsByIndex(i);
                 groups.addAll(sourceColumns);
@@ -307,7 +307,6 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel {
         // rebuild rowType & columnRowType
         this.rowType = this.deriveRowType();
         this.columnRowType = this.buildColumnRowType();
-
     }
 
     private Boolean isExactlyMatched() {
@@ -330,7 +329,8 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel {
             return false;
         }
 
-        if (!checkAggCall()) return false;
+        if (!checkAggCall())
+            return false;
         Set<String> cuboidDimSet = new HashSet<>();
         if (getContext() != null && getContext().storageContext.getCandidate() != null) {
             cuboidDimSet = getContext().storageContext.getCandidate().getLayoutEntity().getOrderedDimensions().values()
@@ -403,7 +403,7 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel {
     private boolean isDimExactlyMatch(Set<String> groupByCols, Set<String> cuboidDimSet) {
         return groupByCols.equals(cuboidDimSet) && isSimpleGroupType()
                 && (this.context.getInnerGroupByColumns().isEmpty()
-                || !this.context.getGroupCCColRewriteMapping().isEmpty());
+                        || !this.context.getGroupCCColRewriteMapping().isEmpty());
 
     }
 
@@ -502,7 +502,8 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel {
     }
 
     public boolean isContainCountDistinct() {
-        return aggregateCalls.stream().anyMatch(agg -> agg.getAggregation().getKind() == SqlKind.COUNT && agg.isDistinct());
+        return aggregateCalls.stream()
+                .anyMatch(agg -> agg.getAggregation().getKind() == SqlKind.COUNT && agg.isDistinct());
     }
 
 }
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index e944266a82..d68329c0a0 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -101,6 +101,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
     protected List<AggregateCall> rewriteAggCalls;
     protected List<TblColRef> groups;
     protected List<FunctionDesc> aggregations;
+
     public OLAPAggregateRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, boolean indicator,
             ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls)
             throws InvalidRelException {
@@ -501,7 +502,6 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
 
         // rebuild aggregate call
         return new AggregateCall(newAgg, false, newArgList, fieldType, callName);
-
     }
 
     /**
diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java
index a53f9af8e7..c1f23bb5d7 100644
--- a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java
+++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java
@@ -365,7 +365,7 @@ public class QueryHistoryServiceTest extends NLocalFileMetadataTestCase {
 
         // get all tables
         tableMap = queryHistoryService.getQueryHistoryTableMap(null);
-        Assert.assertEquals(27, tableMap.size());
+        Assert.assertEquals(28, tableMap.size());
 
         // not existing project
         try {
diff --git a/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java b/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
index fe1d092ee3..81439609e9 100644
--- a/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
+++ b/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
@@ -50,6 +50,7 @@ import org.apache.kylin.metadata.query.StructField;
 import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
 import org.apache.kylin.metadata.realization.NoStreamingRealizationFoundException;
 import org.apache.kylin.query.engine.data.QueryResult;
+import org.apache.kylin.query.exception.NotSupportedSQLException;
 import org.apache.kylin.query.mask.QueryResultMasks;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.query.util.PushDownUtil;
@@ -81,6 +82,7 @@ public class QueryRoutingEngine {
         queryParams.setDefaultSchema(queryExec.getDefaultSchemaName());
 
         if (queryParams.isForcedToPushDown()) {
+            checkContainsSumLC(queryParams, null);
             return pushDownQuery(null, queryParams);
         }
 
@@ -128,6 +130,7 @@ public class QueryRoutingEngine {
             if (cause instanceof SQLException && cause.getCause() instanceof KylinException) {
                 throw (SQLException) cause;
             }
+            checkContainsSumLC(queryParams, e);
             if (shouldPushdown(cause, queryParams)) {
                 return pushDownQuery((SQLException) cause, queryParams);
             } else {
@@ -146,6 +149,7 @@ public class QueryRoutingEngine {
                     }
                 }
             }
+            checkContainsSumLC(queryParams, e);
             if (shouldPushdown(e, queryParams)) {
                 return pushDownQuery(e, queryParams);
             } else {
@@ -167,6 +171,17 @@ public class QueryRoutingEngine {
         return false;
     }
 
+    private void checkContainsSumLC(QueryParams queryParams, Throwable t) {
+        if (queryParams.getSql().contains("sum_lc")) {
+            String message = "There is no aggregate index to answer this query, sum_lc() function now is not supported by other query engine";
+            if (t != null) {
+                throw new NotSupportedSQLException(message, t);
+            } else {
+                throw new NotSupportedSQLException(message);
+            }
+        }
+    }
+
     private boolean shouldPushdown(Throwable e, QueryParams queryParams) {
         if (queryParams.isForcedToIndex()) {
             return false;
diff --git a/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java b/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java
index 9a1725f1bc..93fe27494f 100644
--- a/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java
+++ b/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java
@@ -17,11 +17,11 @@
  */
 package org.apache.kylin.query.engine;
 
-import static org.apache.kylin.query.util.AsyncQueryUtil.ASYNC_QUERY_JOB_ID_PRE;
 import static org.apache.kylin.metadata.cube.model.NBatchConstants.P_DIST_META_URL;
 import static org.apache.kylin.metadata.cube.model.NBatchConstants.P_JOB_ID;
 import static org.apache.kylin.metadata.cube.model.NBatchConstants.P_QUERY_CONTEXT;
 import static org.apache.kylin.metadata.cube.model.NBatchConstants.P_QUERY_PARAMS;
+import static org.apache.kylin.query.util.AsyncQueryUtil.ASYNC_QUERY_JOB_ID_PRE;
 
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -45,11 +45,11 @@ import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.common.util.ShellException;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.query.util.QueryParams;
-import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -217,7 +217,7 @@ public class AsyncQueryJobTest extends NLocalFileMetadataTestCase {
                 rawResourceMap.put(zipEntry.getName(), raw);
             }
         }
-        Assert.assertEquals(83, rawResourceMap.size());
+        Assert.assertEquals(84, rawResourceMap.size());
     }
 
     private void testKylinConfig(FileSystem workingFileSystem, FileStatus metaFileStatus) throws IOException {
diff --git a/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java b/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java
index f929d50a39..7599095bf6 100644
--- a/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java
+++ b/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java
@@ -39,6 +39,7 @@ import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.metadata.realization.NoStreamingRealizationFoundException;
 import org.apache.kylin.query.QueryExtension;
 import org.apache.kylin.query.engine.data.QueryResult;
+import org.apache.kylin.query.exception.NotSupportedSQLException;
 import org.apache.kylin.query.util.QueryParams;
 import org.apache.kylin.source.adhocquery.PushdownResult;
 import org.apache.spark.SparkException;
@@ -304,4 +305,19 @@ public class QueryRoutingEngineTest extends NLocalFileMetadataTestCase {
         QueryResult queryResult = queryRoutingEngine.queryWithSqlMassage(queryParams);
         Assert.assertEquals(0, queryResult.getSize());
     }
+
+    @Test
+    public void testQueryPushDownWithSumLC() {
+        final String sql = "select sum_lc(column, dateColumn) from success_table_2";
+        final String project = "default";
+        KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv();
+        QueryParams queryParams = new QueryParams();
+        queryParams.setProject(project);
+        queryParams.setSql(sql);
+        queryParams.setKylinConfig(kylinconfig);
+        queryParams.setSelect(true);
+        queryParams.setForcedToPushDown(true);
+
+        Assert.assertThrows(NotSupportedSQLException.class, () -> queryRoutingEngine.queryWithSqlMassage(queryParams));
+    }
 }
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
index e1f538bad8..f81e6f7d37 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
@@ -20,19 +20,19 @@ package org.apache.kylin.engine.spark.job
 
 import org.apache.kylin.common.KapConfig
 import org.apache.kylin.engine.spark.builder.DFBuilderHelper.ENCODE_SUFFIX
+import org.apache.kylin.measure.bitmap.BitmapMeasureType
+import org.apache.kylin.measure.hllc.HLLCMeasureType
 import org.apache.kylin.metadata.cube.cuboid.NSpanningTree
 import org.apache.kylin.metadata.cube.model.{NCubeJoinedFlatTableDesc, NDataSegment}
 import org.apache.kylin.metadata.model.NDataModel.Measure
-import org.apache.kylin.measure.bitmap.BitmapMeasureType
-import org.apache.kylin.measure.hllc.HLLCMeasureType
 import org.apache.kylin.metadata.model.TblColRef
-import org.apache.spark.sql.functions.{col, _}
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.udaf._
 import org.apache.spark.sql.util.SparderTypeUtil
 import org.apache.spark.sql.util.SparderTypeUtil.toSparkType
-import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.{Column, DataFrame}
 
 import java.util
 import java.util.Locale
@@ -60,20 +60,20 @@ object CuboidAggregator {
       dataset.schema.fieldNames.zipWithIndex.map(tp => (tp._2, tp._1)).toMap
 
     aggregate(dataset, dimensions, measures, //
-      (colRef: TblColRef) => columnIndex.apply(flatTableDesc.getColumnIndex(colRef)) )
+      (colRef: TblColRef) => columnIndex.apply(flatTableDesc.getColumnIndex(colRef)))
   }
 
   /**
-    * Avoid compilation error when invoking aggregate in java
-    * incompatible types: Function1 is not a functional interface
-    *
-    * @param dataset
-    * @param dimensions
-    * @param measures
-    * @param tableDesc
-    * @param isSparkSQL
-    * @return
-    */
+   * Avoid compilation error when invoking aggregate in java
+   * incompatible types: Function1 is not a functional interface
+   *
+   * @param dataset
+   * @param dimensions
+   * @param measures
+   * @param tableDesc
+   * @param isSparkSQL
+   * @return
+   */
   def aggregateJava(dataset: DataFrame,
                     dimensions: util.Set[Integer],
                     measures: util.Map[Integer, Measure],
@@ -223,6 +223,15 @@ object CuboidAggregator {
           }
         case "CORR" =>
           new Column(Literal(null, DoubleType)).as(measureEntry._1.toString)
+        case "SUM_LC" =>
+          val colDataType = function.getReturnDataType
+          val sparkDataType = toSparkType(colDataType)
+          if (reuseLayout) {
+            new Column(ReuseSumLC(columns.head.expr, sparkDataType).toAggregateExpression()).as(measureEntry._1.toString)
+          } else {
+            new Column(EncodeSumLC(columns.head.expr, columns.drop(1).head.expr, sparkDataType)
+              .toAggregateExpression()).as(measureEntry._1.toString)
+          }
       }
     }.toSeq
 
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/AggregatePlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/AggregatePlan.scala
index 8a2682d460..714b42d15d 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/AggregatePlan.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/AggregatePlan.scala
@@ -17,11 +17,11 @@
  */
 package org.apache.kylin.query.runtime.plan
 
-import org.apache.kylin.engine.spark.utils.LogEx
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rex.RexLiteral
 import org.apache.calcite.sql.SqlKind
 import org.apache.kylin.common.KylinConfig
+import org.apache.kylin.engine.spark.utils.LogEx
 import org.apache.kylin.metadata.model.FunctionDesc
 import org.apache.kylin.query.relnode.{KapAggregateRel, KapProjectRel, KylinAggregateCall, OLAPAggregateRel}
 import org.apache.kylin.query.util.RuntimeHelper
@@ -43,7 +43,8 @@ import scala.collection.JavaConverters._
 // scalastyle:off
 object AggregatePlan extends LogEx {
   val binaryMeasureType =
-    List("PERCENTILE", "PERCENTILE_APPROX", "INTERSECT_COUNT", "COUNT_DISTINCT", "BITMAP_UUID", FunctionDesc.FUNC_BITMAP_BUILD)
+    List("PERCENTILE", "PERCENTILE_APPROX", "INTERSECT_COUNT", "COUNT_DISTINCT", "BITMAP_UUID",
+      FunctionDesc.FUNC_BITMAP_BUILD, FunctionDesc.FUNC_SUM_LC)
 
   def agg(inputs: java.util.List[DataFrame],
           rel: KapAggregateRel): DataFrame = logTime("aggregate", debug = true) {
@@ -83,6 +84,10 @@ object AggregatePlan extends LogEx {
             case FunctionDesc.FUNC_BITMAP_BUILD =>
               val aggName = SchemaProcessor.replaceToAggravateSchemaName(index, "BITMAP_BUILD_DECODE", hash, argNames: _*)
               KapFunctions.precise_bitmap_build_decode(columnName.head).alias(aggName)
+            case FunctionDesc.FUNC_SUM_LC =>
+              val aggName = SchemaProcessor.replaceToAggravateSchemaName(index, "SUM_LC_DECODE", hash, argNames: _*)
+              val sparkDataType = SparderTypeUtil.toSparkType(dataType)
+              KapFunctions.k_sum_lc_decode(columnName.head, sparkDataType.json).alias(aggName)
             case _ =>
               col(schemaNames.apply(call.getArgList.get(0)))
           }
@@ -185,6 +190,8 @@ object AggregatePlan extends LogEx {
         } else if (funcName.equalsIgnoreCase(FunctionDesc.FUNC_PERCENTILE)) {
           require(columnName.size == 2, s"Input columns size ${columnName.size} don't equal to 2.")
           KapFunctions.k_percentile(columnName.head, columnName(1), dataType.getPrecision).alias(aggName)
+        } else if (funcName.equalsIgnoreCase(FunctionDesc.FUNC_SUM_LC)) {
+          KapFunctions.k_sum_lc(columnName.head, SparderTypeUtil.toSparkType(dataType)).alias(aggName)
         } else {
           callUDF(registeredFuncName, columnName.toList: _*).alias(aggName)
         }
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KapFunctions.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KapFunctions.scala
index eae6251f1a..7d6d9e4cb2 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KapFunctions.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KapFunctions.scala
@@ -21,13 +21,11 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.catalyst.expressions.ExpressionUtils.expression
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
-import org.apache.spark.sql.catalyst.expressions.{ApproxCountDistinctDecode, CeilDateTime, DictEncode, DictEncodeV3, Expression, ExpressionInfo, FloorDateTime, ImplicitCastInputTypes, In, KapAddMonths, KapSubtractMonths, Like, Literal, PercentileDecode, PreciseCountDistinctDecode, RLike, RoundBase, SplitPart, Sum0, TimestampAdd, TimestampDiff, Truncate}
-import org.apache.spark.sql.types.{ArrayType, BinaryType, DoubleType, LongType, StringType}
 import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenerator, CodegenContext, ExprCode}
-import org.apache.spark.sql.catalyst.expressions.{ApproxCountDistinctDecode, CeilDateTime, DictEncode, EmptyRow, Expression, ExpressionInfo, FloorDateTime, ImplicitCastInputTypes, In, KapAddMonths, KapSubtractMonths, Like, Literal, PercentileDecode, PreciseCountDistinctDecode, RLike, RoundBase, SplitPart, Sum0, TimestampAdd, TimestampDiff, Truncate}
-import org.apache.spark.sql.types.{ArrayType, BinaryType, ByteType, DataType, DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType}
-import org.apache.spark.sql.udaf.{ApproxCountDistinct, IntersectCount, Percentile, PreciseBitmapBuildBase64Decode, PreciseBitmapBuildBase64WithIndex, PreciseBitmapBuildPushDown, PreciseCardinality, PreciseCountDistinct, PreciseCountDistinctAndArray, PreciseCountDistinctAndValue, ReusePreciseCountDistinct}
+import org.apache.spark.sql.catalyst.expressions.{ApproxCountDistinctDecode, CeilDateTime, DictEncode, DictEncodeV3, EmptyRow, Expression, ExpressionInfo, FloorDateTime, ImplicitCastInputTypes, In, KapAddMonths, KapSubtractMonths, Like, Literal, PercentileDecode, PreciseCountDistinctDecode, RLike, RoundBase, SplitPart, Sum0, SumLCDecode, TimestampAdd, TimestampDiff, Truncate}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.udaf._
 
 object KapFunctions {
 
@@ -62,6 +60,12 @@ object KapFunctions {
 
   def in(value: Expression, list: Seq[Expression]): Column = Column(In(value, list))
 
+  def k_sum_lc(measureCol: Column, wrapDataType: DataType): Column =
+    Column(ReuseSumLC(measureCol.expr, wrapDataType, wrapDataType).toAggregateExpression())
+
+  def k_sum_lc_decode(measureCol: Column, wrapDataType: String): Column =
+    Column(SumLCDecode(measureCol.expr, Literal(wrapDataType)))
+
   def k_percentile(head: Column, column: Column, precision: Int): Column =
     Column(Percentile(head.expr, precision, Some(column.expr), DoubleType).toAggregateExpression())
 
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
index ea49dac47e..321bece62c 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
@@ -22,8 +22,8 @@ import org.apache.kylin.measure.hllc.HLLCounter
 import org.apache.kylin.measure.percentile.PercentileSerializer
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
-import org.apache.spark.sql.types.Decimal
-import org.apache.spark.sql.udaf.BitmapSerAndDeSerObj
+import org.apache.spark.sql.types.{DataType, Decimal}
+import org.apache.spark.sql.udaf.{BitmapSerAndDeSerObj, SumLCUtil}
 
 import java.nio.ByteBuffer
 import scala.reflect.ClassTag
@@ -117,15 +117,15 @@ object ExpressionUtils {
   }
 
   def approxCountDistinctDecodeHelper(bytes: Any, precision: Any): Long = {
-      val storageFormat = bytes.asInstanceOf[Array[Byte]]
-      val preciseValue = precision.asInstanceOf[Int]
-      if (storageFormat.nonEmpty) {
-        val counter = new HLLCounter(preciseValue)
-        counter.readRegisters(ByteBuffer.wrap(storageFormat))
-        counter.getCountEstimate
-      } else {
-        0L
-      }
+    val storageFormat = bytes.asInstanceOf[Array[Byte]]
+    val preciseValue = precision.asInstanceOf[Int]
+    if (storageFormat.nonEmpty) {
+      val counter = new HLLCounter(preciseValue)
+      counter.readRegisters(ByteBuffer.wrap(storageFormat))
+      counter.getCountEstimate
+    } else {
+      0L
+    }
   }
 
   def percentileDecodeHelper(bytes: Any, quantile: Any, precision: Any): Double = {
@@ -134,5 +134,12 @@ object ExpressionUtils {
     val counter = serializer.deserialize(ByteBuffer.wrap(arrayBytes))
     counter.getResultEstimateWithQuantileRatio(quantile.asInstanceOf[Decimal].toDouble)
   }
-}
 
+  def sumLCDecodeHelper(bytes: Any, wrapDataType: Any): Number = {
+    val arrayBytes = bytes.asInstanceOf[Array[Byte]]
+    val codec = SumLCUtil.getNumericNullSafeSerializerByDataType(DataType.fromJson(wrapDataType.toString))
+    val counter = SumLCUtil.decodeToSumLCCounter(arrayBytes, codec)
+    counter.getSumLC
+  }
+
+}
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala
index 24e25c940f..ea28802be5 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala
@@ -125,7 +125,6 @@ case class KapSubtractMonths(a: Expression, b: Expression)
 
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.util.TypeUtils
-import org.apache.spark.sql.types._
 
 @ExpressionDescription(
   usage = "_FUNC_(expr) - Returns the sum calculated from values of a group. " +
@@ -772,4 +771,65 @@ case class PercentileDecode(bytes: Expression, quantile: Expression, precision:
     val newChildren = Seq(newFirst, newSecond, newThird)
     super.legacyWithNewChildren(newChildren)
   }
-}
\ No newline at end of file
+}
+
+case class SumLCDecode(bytes: Expression, wrapDataTypeExpr: Expression) extends BinaryExpression with ExpectsInputTypes {
+  override def left: Expression = bytes;
+
+  override def right: Expression = wrapDataTypeExpr
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType, StringType)
+
+  def wrapDataType = DataType.fromJson(wrapDataTypeExpr.toString)
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+    val leftGen = left.genCode(ctx)
+    val rightGen = right.genCode(ctx)
+    val expressionUtils = ExpressionUtils.getClass.getName.stripSuffix("$")
+    val decimalUtil = classOf[Decimal].getName
+    val evalValue = ctx.freshName("evalValue")
+    val javaType = CodeGenerator.javaType(dataType)
+    val boxedJavaType = CodeGenerator.boxedType(javaType)
+    val commonCodeBlock =
+      code"""
+        ${leftGen.code}
+        ${rightGen.code}
+        Number $evalValue = $expressionUtils.sumLCDecodeHelper(${leftGen.value}, ${rightGen.value});
+        boolean ${ev.isNull} = $evalValue == null;
+        $javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
+          """
+    val conditionCodeBlock = if (wrapDataType.isInstanceOf[DecimalType]) {
+      code"""
+        if(!${ev.isNull}) {
+            ${ev.value} = $decimalUtil.fromDecimal($evalValue);
+        }
+          """
+    } else {
+      code"""
+        if(!${ev.isNull}) {
+            ${ev.value} = ($boxedJavaType) $evalValue;
+        }
+          """
+    }
+    ev.copy(code = commonCodeBlock + conditionCodeBlock)
+  }
+
+  override protected def nullSafeEval(bytes: Any, wrapDataTypeExpr: Any): Any = {
+    val decodeVal = ExpressionUtils.sumLCDecodeHelper(bytes, wrapDataTypeExpr)
+    wrapDataType match {
+      case DecimalType() =>
+        Decimal.fromDecimal(decodeVal.asInstanceOf[java.math.BigDecimal])
+      case _ =>
+        decodeVal
+    }
+  }
+
+  override def dataType: DataType = wrapDataType
+
+  override def prettyName: String = "sum_lc_decode"
+
+  override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): Expression = {
+    val newChildren = Seq(newLeft, newRight)
+    super.legacyWithNewChildren(newChildren)
+  }
+}
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/SparderAggFun.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/SparderAggFun.scala
index 422951291d..1397251368 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/SparderAggFun.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/SparderAggFun.scala
@@ -18,9 +18,6 @@
 
 package org.apache.spark.sql.udf
 
-import java.nio.ByteBuffer
-import java.util
-
 import com.google.common.collect.Maps
 import org.apache.kylin.measure.MeasureAggregator
 import org.apache.kylin.measure.bitmap.BitmapCounter
@@ -35,6 +32,9 @@ import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAg
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SparderTypeUtil
 
+import java.nio.ByteBuffer
+import java.util
+
 class SparderAggFun(funcName: String, dataTp: KyDataType)
   extends UserDefinedAggregateFunction
     with Logging {
@@ -89,7 +89,7 @@ class SparderAggFun(funcName: String, dataTp: KyDataType)
       byteBuffer = ByteBuffer.allocate(1024 * 1024)
     }
 
-     val initVal = if (isCount) {
+    val initVal = if (isCount) {
       // return 0 instead of null in case of no input
       measureAggregator.reset()
       byteBuffer.clear()
@@ -106,6 +106,7 @@ class SparderAggFun(funcName: String, dataTp: KyDataType)
   override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
     merge(buffer, input)
   }
+
   override def merge(buffer: MutableAggregationBuffer, input: Row): Unit = {
     if (!input.isNullAt(0)) {
       measureAggregator.reset()
@@ -167,7 +168,7 @@ class SparderAggFun(funcName: String, dataTp: KyDataType)
         case _ => null
       }
 
-     ret
+      ret
     }
   }
 
diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/LayoutEntityConverter.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/LayoutEntityConverter.scala
index 528c8cb520..429cdfbbe9 100644
--- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/LayoutEntityConverter.scala
+++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/LayoutEntityConverter.scala
@@ -49,11 +49,11 @@ object LayoutEntityConverter {
         bucketSpec = Option(bucketSp))
     }
 
-    def toSchema() : StructType = {
+    def toSchema(): StructType = {
       genCuboidSchemaFromNCuboidLayout(layoutEntity)
     }
 
-    def toExactlySchema() : StructType = {
+    def toExactlySchema(): StructType = {
       genCuboidSchemaFromNCuboidLayout(layoutEntity, true)
     }
   }
@@ -122,6 +122,7 @@ object LayoutEntityConverter {
       genSparkStructField(i._1.toString, i._2)
     }.toSeq ++ measures)
   }
+
   def genBucketSpec(layoutEntity: LayoutEntity, partitionColumn: Set[String]): Option[BucketSpec] = {
     if (layoutEntity.getShardByColumns.isEmpty) {
       Option(BucketSpec(layoutEntity.getBucketNum,
@@ -161,6 +162,7 @@ object LayoutEntityConverter {
       case "COLLECT_SET" =>
         val parameter = function.getParameters.get(0)
         ArrayType(SparderTypeUtil.toSparkType(parameter.getColRef.getType))
+      case "SUM_LC" => BinaryType
       case _ => SparderTypeUtil.toSparkType(function.getReturnDataType)
     }
   }
diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/udaf/NullSafeValueSerializer.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/udaf/NullSafeValueSerializer.scala
index 75a035a344..92fd5dd0d5 100644
--- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/udaf/NullSafeValueSerializer.scala
+++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/udaf/NullSafeValueSerializer.scala
@@ -18,11 +18,11 @@
 
 package org.apache.spark.sql.udaf
 
+import org.apache.spark.unsafe.types.UTF8String
+
 import java.io.{DataInput, DataOutput}
 import java.nio.charset.StandardCharsets
 
-import org.apache.spark.unsafe.types.UTF8String
-
 @SerialVersionUID(1)
 sealed trait NullSafeValueSerializer {
   final def serialize(output: DataOutput, value: Any): Unit = {
@@ -150,6 +150,29 @@ class StringSerializer extends NullSafeValueSerializer {
 class DecimalSerializer extends NullSafeValueSerializer {
   override def serialize0(output: DataOutput, value: Any): Unit = {
     val decimal = value.asInstanceOf[BigDecimal]
+    DecimalCodecUtil.encode(decimal, output)
+  }
+
+  override def deSerialize0(input: DataInput, length: Int): Any = {
+    DecimalCodecUtil.decode(input)
+  }
+}
+
+@SerialVersionUID(1)
+class JavaBigDecimalSerializer extends NullSafeValueSerializer {
+  override def serialize0(output: DataOutput, value: Any): Unit = {
+    val decimal = BigDecimal.apply(value.asInstanceOf[java.math.BigDecimal])
+    DecimalCodecUtil.encode(decimal, output);
+  }
+
+  override def deSerialize0(input: DataInput, length: Int): Any = {
+    val decimal = DecimalCodecUtil.decode(input)
+    decimal.asInstanceOf[BigDecimal].bigDecimal
+  }
+}
+
+object DecimalCodecUtil {
+  def encode(decimal: BigDecimal, output: DataOutput): Unit = {
     val bytes = decimal.toString().getBytes(StandardCharsets.UTF_8)
     output.writeInt(1 + bytes.length)
     output.writeByte(decimal.scale)
@@ -157,7 +180,7 @@ class DecimalSerializer extends NullSafeValueSerializer {
     output.write(bytes)
   }
 
-  override def deSerialize0(input: DataInput, length: Int): Any = {
+  def decode(input: DataInput): Any = {
     val scale = input.readByte()
     val length = input.readInt()
     val bytes = new Array[Byte](length)
@@ -166,4 +189,3 @@ class DecimalSerializer extends NullSafeValueSerializer {
     decimal.setScale(scale)
   }
 }
-
diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/udaf/SumLC.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/udaf/SumLC.scala
new file mode 100644
index 0000000000..bb68841e01
--- /dev/null
+++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/udaf/SumLC.scala
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.udaf
+
+import com.esotericsoftware.kryo.KryoException
+import com.esotericsoftware.kryo.io.{Input, KryoDataInput, KryoDataOutput, Output}
+import org.apache.kylin.common.util.DateFormat
+import org.apache.kylin.measure.sumlc.SumLCCounter
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, TypedImperativeAggregate}
+import org.apache.spark.sql.types._
+
+/**
+ * Build sum_lc measure, has two implements,
+ * for non-reuse sum_lc, two input, one for value column, another for date column
+ * for reuse sum_lc,construct measure from parent layout
+ *
+ * @param wrapDataType           return serialize data type
+ * @param outputDataType         return calculate data type, see Percentile outputType
+ * @param mutableAggBufferOffset default value
+ * @param inputAggBufferOffset   default value
+ */
+sealed abstract class BaseSumLC(wrapDataType: DataType,
+                                outputDataType: DataType = BinaryType,
+                                mutableAggBufferOffset: Int = 0,
+                                inputAggBufferOffset: Int = 0)
+  extends TypedImperativeAggregate[SumLCCounter] with Serializable with Logging {
+  lazy val serializer: NullSafeValueSerializer = SumLCUtil.getNumericNullSafeSerializerByDataType(wrapDataType)
+
+  override def prettyName: String = "sum_lc"
+
+  override def eval(buffer: SumLCCounter): Any = {
+    outputDataType match {
+      case BinaryType =>
+        serialize(buffer)
+      case DecimalType() =>
+        if (buffer.getSumLC != null) {
+          Decimal.fromDecimal(buffer.getSumLC.asInstanceOf[java.math.BigDecimal])
+        } else {
+          Decimal.ZERO
+        }
+      case _ =>
+        buffer.getSumLC
+    }
+  }
+
+  override def createAggregationBuffer(): SumLCCounter = new SumLCCounter()
+
+  override def merge(buffer: SumLCCounter, input: SumLCCounter): SumLCCounter = {
+    SumLCCounter.merge(buffer, input)
+  }
+
+  override def serialize(buffer: SumLCCounter): Array[Byte] = {
+    val array: Array[Byte] = new Array[Byte](1024 * 1024)
+    val output: Output = new Output(array)
+    serialize(buffer, array, output)
+  }
+
+  private def serialize(buffer: SumLCCounter, array: Array[Byte], output: Output): Array[Byte] = {
+    try {
+      if (buffer == null) {
+        Array.empty[Byte]
+      } else {
+        output.clear()
+        val out = new KryoDataOutput(output)
+        serializer.serialize(out, buffer.getSumLC)
+        out.writeLong(buffer.getTimestamp)
+        val mark = output.position()
+        output.close()
+        array.slice(0, mark)
+      }
+    } catch {
+      case th: KryoException if th.getMessage.contains("Buffer overflow") =>
+        logWarning(s"Resize buffer size to ${array.length * 2}")
+        val updateArray = new Array[Byte](array.length * 2)
+        output.setBuffer(updateArray)
+        serialize(buffer, updateArray, output)
+      case th =>
+        throw th
+    }
+  }
+
+  override def deserialize(bytes: Array[Byte]): SumLCCounter = {
+    SumLCUtil.decodeToSumLCCounter(bytes, serializer)
+  }
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = outputDataType
+}
+
+case class EncodeSumLC(
+                        evalCol: Expression,
+                        dateCol: Expression,
+                        wrapDataType: DataType,
+                        outputDataType: DataType = BinaryType,
+                        mutableAggBufferOffset: Int = 0,
+                        inputAggBufferOffset: Int = 0)
+  extends BaseSumLC(wrapDataType, outputDataType, mutableAggBufferOffset, inputAggBufferOffset) {
+
+  override def update(buffer: SumLCCounter, input: InternalRow): SumLCCounter = {
+    val columnEvalVal = evalCol.eval(input)
+    val columnVal = columnEvalVal match {
+      case decimal: Decimal =>
+        decimal.toJavaBigDecimal
+      case _ =>
+        columnEvalVal.asInstanceOf[Number]
+    }
+    val dateValStr = String.valueOf(dateCol.eval(input)).trim
+    val timestampVal = DateFormat.stringToMillis(dateValStr)
+    SumLCCounter.merge(buffer, columnVal, timestampVal)
+  }
+
+  override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override def children: Seq[Expression] = Seq(evalCol, dateCol)
+
+  override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression =
+    super.legacyWithNewChildren(newChildren)
+}
+
+case class ReuseSumLC(measure: Expression,
+                      wrapDataType: DataType,
+                      outputDataType: DataType = BinaryType,
+                      mutableAggBufferOffset: Int = 0,
+                      inputAggBufferOffset: Int = 0)
+  extends BaseSumLC(wrapDataType, outputDataType, mutableAggBufferOffset, inputAggBufferOffset) {
+
+  override def update(buffer: SumLCCounter, input: InternalRow): SumLCCounter = {
+    val evalCounter = deserialize(measure.eval(input).asInstanceOf[Array[Byte]])
+    SumLCCounter.merge(buffer, evalCounter)
+  }
+
+  override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override def children: Seq[Expression] = Seq(measure)
+
+  override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression =
+    super.legacyWithNewChildren(newChildren)
+
+}
+
+object SumLCUtil extends Logging {
+
+  def decodeToSumLCCounter(bytes: Array[Byte], codec: NullSafeValueSerializer): SumLCCounter = {
+    if (bytes.nonEmpty) {
+      val in = new KryoDataInput(new Input(bytes))
+      val sumLC = codec.deserialize(in).asInstanceOf[Number]
+      val timestamp = in.readLong()
+      new SumLCCounter(sumLC, timestamp)
+    } else {
+      new SumLCCounter()
+    }
+  }
+
+  def getNumericNullSafeSerializerByDataType(dataType: org.apache.spark.sql.types.DataType): NullSafeValueSerializer = {
+    dataType match {
+      case LongType => new LongSerializer
+      case DoubleType => new DoubleSerializer
+      case DecimalType() => new JavaBigDecimalSerializer
+      case dt => throw new UnsupportedOperationException("Unsupported sum_lc dimension type: " + dt)
+    }
+  }
+
+}