You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/01/12 09:28:10 UTC
[kylin] 14/17: KYLIN-5397 Support sum_lc function
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 60ad2aa86de528209b898c2d2bb343bcec1c3d7e
Author: Bowen Song <bo...@kyligence.io>
AuthorDate: Thu Nov 10 11:23:54 2022 +0800
KYLIN-5397 Support sum_lc function
* Support sum_lc function
* Remove wrap sum_lc type
* Remove rewrite logic, and unnecessary check
* fix part ut
---
.../kylin/rest/service/AccessServiceTest.java | 9 +-
.../kylin/rest/service/AclTCRServiceTest.java | 4 +-
.../java/org/apache/kylin/common/msg/Message.java | 2 +-
.../org/apache/kylin/common/util/DateFormat.java | 29 ++-
.../apache/kylin/common/util/DateFormatTest.java | 51 ++++
.../apache/kylin/measure/MeasureTypeFactory.java | 2 +
.../apache/kylin/measure/sumlc/SumLCCounter.java | 96 ++++++++
.../kylin/measure/sumlc/SumLCMeasureType.java | 172 +++++++++++++
.../apache/kylin/metadata/datatype/DataType.java | 10 +-
.../apache/kylin/metadata/model/FunctionDesc.java | 68 ++++--
.../metadata/model/util/FunctionDescTest.java | 12 +-
.../metadata/project/NProjectManagerTest.java | 5 +-
.../localmeta/metadata/_global/project/sum_lc.json | 35 +++
.../f35f2937-9e4d-347a-7465-d64df939e7d6.json | 13 +
.../f35f2937-9e4d-347a-7465-d64df939e7d6.json | 43 ++++
.../f35f2937-9e4d-347a-7465-d64df939e7d6.json | 267 +++++++++++++++++++++
.../metadata/sum_lc/table/SSB.SUMLC_EXTEND_4X.json | 78 ++++++
.../kylin/rest/service/ProjectServiceTest.java | 8 +-
.../kylin/query/relnode/KapAggregateRel.java | 21 +-
.../kylin/query/relnode/OLAPAggregateRel.java | 2 +-
.../rest/service/QueryHistoryServiceTest.java | 2 +-
.../kylin/query/engine/QueryRoutingEngine.java | 15 ++
.../kylin/query/engine/AsyncQueryJobTest.java | 6 +-
.../kylin/query/engine/QueryRoutingEngineTest.java | 16 ++
.../kylin/engine/spark/job/CuboidAggregator.scala | 41 ++--
.../kylin/query/runtime/plan/AggregatePlan.scala | 11 +-
.../scala/org/apache/spark/sql/KapFunctions.scala | 14 +-
.../sql/catalyst/expressions/ExpressionUtils.scala | 31 ++-
.../sql/catalyst/expressions/KapExpresssions.scala | 64 ++++-
.../org/apache/spark/sql/udf/SparderAggFun.scala | 11 +-
.../apache/spark/sql/LayoutEntityConverter.scala | 6 +-
.../spark/sql/udaf/NullSafeValueSerializer.scala | 30 ++-
.../scala/org/apache/spark/sql/udaf/SumLC.scala | 191 +++++++++++++++
33 files changed, 1256 insertions(+), 109 deletions(-)
diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java
index fb102f63de..b0f9142756 100644
--- a/src/common-service/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java
+++ b/src/common-service/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java
@@ -48,6 +48,7 @@ import org.apache.kylin.common.persistence.AclEntity;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.user.ManagedUser;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.request.AccessRequest;
import org.apache.kylin.rest.request.GlobalAccessRequest;
@@ -98,8 +99,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import org.apache.kylin.metadata.user.ManagedUser;
-
@RunWith(PowerMockRunner.class)
@PrepareForTest({ SpringContext.class, UserGroupInformation.class, KylinConfig.class, NProjectManager.class })
public class AccessServiceTest extends NLocalFileMetadataTestCase {
@@ -597,14 +596,14 @@ public class AccessServiceTest extends NLocalFileMetadataTestCase {
@Test
public void testGetGrantedProjectsOfUser() throws IOException {
List<String> result = accessService.getGrantedProjectsOfUser("ADMIN");
- assertEquals(27, result.size());
+ assertEquals(28, result.size());
}
@Test
public void testGetGrantedProjectsOfUserOrGroup() throws IOException {
// admin user
List<String> result = accessService.getGrantedProjectsOfUserOrGroup("ADMIN", true);
- assertEquals(27, result.size());
+ assertEquals(28, result.size());
// normal user
result = accessService.getGrantedProjectsOfUserOrGroup("ANALYST", true);
@@ -786,7 +785,7 @@ public class AccessServiceTest extends NLocalFileMetadataTestCase {
}
@Test
- public void testAclWithUnNaturalOrderUpdate() throws IOException{
+ public void testAclWithUnNaturalOrderUpdate() throws IOException {
AclEntity ae = accessService.getAclEntity(AclEntityType.PROJECT_INSTANCE,
"1eaca32a-a33e-4b69-83dd-0bb8b1f8c91b");
diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/service/AclTCRServiceTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/service/AclTCRServiceTest.java
index bb994174a9..9a0ee62d7d 100644
--- a/src/common-service/src/test/java/org/apache/kylin/rest/service/AclTCRServiceTest.java
+++ b/src/common-service/src/test/java/org/apache/kylin/rest/service/AclTCRServiceTest.java
@@ -311,7 +311,7 @@ public class AclTCRServiceTest extends NLocalFileMetadataTestCase {
}
private SensitiveDataMask.MaskType getColumnDataMask(AclTCRRequest acl, String database, String table,
- String column) {
+ String column) {
if (acl.getDatabaseName().equals(database)) {
for (val tb : acl.getTables()) {
if (tb.getTableName().equals(table)) {
@@ -1316,7 +1316,7 @@ public class AclTCRServiceTest extends NLocalFileMetadataTestCase {
Mockito.when(userService.isGlobalAdmin("ADMIN")).thenReturn(true);
List<SidPermissionWithAclResponse> responses = accessService.getUserOrGroupAclPermissions(projects, "ADMIN",
true);
- Assert.assertEquals(27, responses.size());
+ Assert.assertEquals(28, responses.size());
Assert.assertTrue(responses.stream().allMatch(response -> "ADMIN".equals(response.getProjectPermission())));
// test normal group
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
index 365474db28..2048ce9bc1 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
@@ -1015,7 +1015,7 @@ public class Message {
}
public String getInvalidTimeFormat() {
- return "Can’t set the time partition column. The values of the selected column is not time formatted. Please select again.";
+ return "Can’t set the time partition column. The values of the selected column is not time formatted: {%s}. Please select again.";
}
public String getSegmentMergeStorageCheckError() {
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java b/src/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
index e5391e3b16..fd8dd243fa 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
@@ -64,6 +64,10 @@ public class DateFormat {
public static final String DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS = "yyyy-MM-dd HH:mm:ss.SSS";
public static final String DEFAULT_DATETIME_PATTERN_WITH_TIMEZONE = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+ private static final int NANOS_TIMESTAMP_LENGTH = 16;
+ private static final int MILLIS_TIMESTAMP_LENGTH = 13;
+ private static final int SECONDS_TIMESTAMP_LENGTH = 10;
+
static final private Map<String, FastDateFormat> formatMap = new ConcurrentHashMap<String, FastDateFormat>();
private static final Map<String, String> dateFormatRegex = Maps.newHashMap();
@@ -210,13 +214,26 @@ public class DateFormat {
return stringToDate(str, regexToPattern.getValue()).getTime();
}
- // try parse it as days to epoch
try {
- long daysToEpoch = Long.parseLong(str);
- return daysToEpoch * 24 * 60 * 60 * 1000;
+ long strToDigit = Long.parseLong(str);
+ if (strToDigit > 0) {
+ if (str.length() == NANOS_TIMESTAMP_LENGTH) {
+ return strToDigit / 1000;
+ } else if (str.length() == MILLIS_TIMESTAMP_LENGTH) {
+ return strToDigit;
+ } else if (str.length() == SECONDS_TIMESTAMP_LENGTH) {
+ return strToDigit * 1000;
+ } else {
+ // try parse it as days to epoch
+ return strToDigit * 24 * 60 * 60 * 1000;
+ }
+ }
} catch (NumberFormatException e) {
+ throw new KylinException(INVALID_TIME_PARTITION_COLUMN,
+ String.format(Locale.ROOT, MsgPicker.getMsg().getInvalidTimeFormat(), str), e);
}
- throw new KylinException(INVALID_TIME_PARTITION_COLUMN, MsgPicker.getMsg().getInvalidTimeFormat());
+ throw new KylinException(INVALID_TIME_PARTITION_COLUMN,
+ String.format(Locale.ROOT, MsgPicker.getMsg().getInvalidTimeFormat(), str));
}
public static boolean isSupportedDateFormat(String dateStr) {
@@ -244,7 +261,8 @@ public class DateFormat {
if (sampleData.matches(patternMap.getKey()))
return patternMap.getValue();
}
- throw new KylinException(INVALID_TIME_PARTITION_COLUMN, MsgPicker.getMsg().getInvalidTimeFormat());
+ throw new KylinException(INVALID_TIME_PARTITION_COLUMN,
+ String.format(Locale.ROOT, MsgPicker.getMsg().getInvalidTimeFormat(), sampleData));
}
/**
@@ -291,4 +309,5 @@ public class DateFormat {
|| DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS.equals(format)
|| DEFAULT_DATETIME_PATTERN_WITH_TIMEZONE.equals(format));
}
+
}
diff --git a/src/core-common/src/test/java/org/apache/kylin/common/util/DateFormatTest.java b/src/core-common/src/test/java/org/apache/kylin/common/util/DateFormatTest.java
index 9f5364e8cc..0387906235 100644
--- a/src/core-common/src/test/java/org/apache/kylin/common/util/DateFormatTest.java
+++ b/src/core-common/src/test/java/org/apache/kylin/common/util/DateFormatTest.java
@@ -24,12 +24,14 @@ import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.Date;
+import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.junit.annotation.MultiTimezoneTest;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.junit.Assert;
+import org.junit.Test;
/**
* Created by dongli on 1/4/16.
@@ -189,4 +191,53 @@ public class DateFormatTest {
}
}
+
+ @Test
+ public void testStringToMillis() {
+ // 2022-12-01 00:00:00
+ long expectedMillis = 1669824000000L;
+
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022.12.01 00:00"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("202212"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022.12.01 00:00:00"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12-01 00:00:00:000"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("20221201"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022.12.01 00:00:00:000"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022.12.01"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022/12/01 00:00"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12-01 00:00:00.000"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022/12/01 00:00:00:000"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("20221201 00:00"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12-01 00:00"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("20221201 00:00:00:000"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("20221201 00:00:00"));
+ }
+
+ @Test
+ public void testStringToMillisSupplement() {
+ long expectedMillis = 1669824000000L;
+
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022/12/01 00:00:00"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("20221201T00:00:00.000Z"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022.12.01T00:00:00.000Z"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12-01"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12-01 00:00:00"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022/12/01"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("20221201 00:00:00.000"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12-01T00:00:00.000Z"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022/12/01T00:00:00.000Z"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022.12.01 00:00:00.000"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12-01T00:00:00.000+08:00"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022/12/01 00:00:00.000"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("1669824000000000"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("1669824000000"));
+ Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("1669824000"));
+ }
+
+ @Test
+ public void testUnsupportedStringToMillis() {
+ Assert.assertThrows(KylinException.class, () -> DateFormat.stringToMillis("12/01"));
+ Assert.assertThrows(KylinException.class, () -> DateFormat.stringToMillis("-12345"));
+ }
}
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java b/src/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
index ff812fcabf..48344c876e 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
@@ -34,6 +34,7 @@ import org.apache.kylin.measure.extendedcolumn.ExtendedColumnMeasureType;
import org.apache.kylin.measure.hllc.HLLCMeasureType;
import org.apache.kylin.measure.percentile.PercentileMeasureType;
import org.apache.kylin.measure.raw.RawMeasureType;
+import org.apache.kylin.measure.sumlc.SumLCMeasureType;
import org.apache.kylin.measure.topn.TopNMeasureType;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.datatype.DataTypeSerializer;
@@ -120,6 +121,7 @@ abstract public class MeasureTypeFactory<T> {
factoryInsts.add(new IntersectMeasureType.Factory());
factoryInsts.add(new CollectSetMeasureType.Factory());
factoryInsts.add(new CorrMeasureType.Factory());
+ factoryInsts.add(new SumLCMeasureType.Factory());
logger.info("Checking custom measure types from kylin config");
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/measure/sumlc/SumLCCounter.java b/src/core-metadata/src/main/java/org/apache/kylin/measure/sumlc/SumLCCounter.java
new file mode 100644
index 0000000000..dfca82230b
--- /dev/null
+++ b/src/core-metadata/src/main/java/org/apache/kylin/measure/sumlc/SumLCCounter.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.sumlc;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import com.google.common.collect.Maps;
+
+public class SumLCCounter implements Serializable {
+ private static final Map<String, BiFunction<Number, Number, Number>> MERGE_FUNC_MAP = Maps.newHashMap();
+
+ static {
+ MERGE_FUNC_MAP.put(Long.class.getSimpleName(), (s1, s2) -> Long.sum((Long) s1, (Long) s2));
+ MERGE_FUNC_MAP.put(Double.class.getSimpleName(), (s1, s2) -> Double.sum((Double) s1, (Double) s2));
+ MERGE_FUNC_MAP.put(BigDecimal.class.getSimpleName(), (s1, s2) -> ((BigDecimal) s1).add((BigDecimal) s2));
+ }
+
+ Number sumLC;
+ Long timestamp;
+
+ public SumLCCounter() {
+
+ }
+
+ public SumLCCounter(Number sumLC, Long timestamp) {
+ this.sumLC = numericTypeConversion(sumLC);
+ this.timestamp = timestamp;
+ }
+
+ public static SumLCCounter merge(SumLCCounter current, Number sumLC, Long timestamp) {
+ SumLCCounter merged = new SumLCCounter(sumLC, timestamp);
+ return merge(current, merged);
+ }
+
+ public static SumLCCounter merge(SumLCCounter value1, SumLCCounter value2) {
+ if (value1 == null || value1.timestamp == null)
+ return value2;
+ if (value2 == null || value2.timestamp == null)
+ return value1;
+ if (value2.timestamp > value1.timestamp) {
+ return value2;
+ } else if (value1.timestamp > value2.timestamp) {
+ return value1;
+ } else {
+ return mergeSum(value1, value2);
+ }
+ }
+
+ private static SumLCCounter mergeSum(SumLCCounter cnt1, SumLCCounter cnt2) {
+ if (cnt1.sumLC == null)
+ return cnt2;
+ if (cnt2.sumLC == null)
+ return cnt1;
+ String sumLCTypeName = cnt1.sumLC.getClass().getSimpleName();
+ Number semiSum = MERGE_FUNC_MAP.get(sumLCTypeName).apply(cnt1.sumLC, cnt2.sumLC);
+ return new SumLCCounter(semiSum, cnt1.timestamp);
+ }
+
+ private static Number numericTypeConversion(Number input) {
+ if (input instanceof Byte || input instanceof Short || input instanceof Integer) {
+ return input.longValue();
+ } else if (input instanceof Float) {
+ return input.doubleValue();
+ } else {
+ return input;
+ }
+ }
+
+ public Number getSumLC() {
+ return sumLC;
+ }
+
+ public Long getTimestamp() {
+ return timestamp;
+ }
+
+}
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/measure/sumlc/SumLCMeasureType.java b/src/core-metadata/src/main/java/org/apache/kylin/measure/sumlc/SumLCMeasureType.java
new file mode 100644
index 0000000000..c94c29e54d
--- /dev/null
+++ b/src/core-metadata/src/main/java/org/apache/kylin/measure/sumlc/SumLCMeasureType.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.sumlc;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.MeasureTypeFactory;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.ImmutableMap;
+
+public class SumLCMeasureType extends MeasureType<SumLCCounter> {
+ public static final String FUNC_SUM_LC = "SUM_LC";
+ public static final String DATATYPE_SUM_LC = "sum_lc";
+ static final Map<String, Class<?>> UDAF_MAP = ImmutableMap.of(SumLCMeasureType.FUNC_SUM_LC, SumLCAggFunc.class);
+
+ public SumLCMeasureType(String funcName, DataType dataType) {
+
+ }
+
+ @Override
+ public MeasureIngester<SumLCCounter> newIngester() {
+ return new MeasureIngester<SumLCCounter>() {
+ @Override
+ public SumLCCounter valueOf(String[] values, MeasureDesc measureDesc,
+ Map<TblColRef, Dictionary<String>> dictionaryMap) {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public MeasureAggregator<SumLCCounter> newAggregator() {
+ return new MeasureAggregator<SumLCCounter>() {
+ @Override
+ public void reset() {
+ // left over issue, default implementation ignored
+ }
+
+ @Override
+ public void aggregate(SumLCCounter value) {
+ // left over issue, default implementation ignored
+ }
+
+ @Override
+ public SumLCCounter aggregate(SumLCCounter value1, SumLCCounter value2) {
+ return null;
+ }
+
+ @Override
+ public SumLCCounter getState() {
+ return null;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return 0;
+ }
+ };
+ }
+
+ @Override
+ public boolean needRewrite() {
+ return true;
+ }
+
+ @Override
+ public Map<String, Class<?>> getRewriteCalciteAggrFunctions() {
+ return UDAF_MAP;
+ }
+
+ public static class Factory extends MeasureTypeFactory<SumLCCounter> {
+
+ @Override
+ public MeasureType<SumLCCounter> createMeasureType(String funcName, DataType dataType) {
+ return new SumLCMeasureType(funcName, dataType);
+ }
+
+ @Override
+ public String getAggrFunctionName() {
+ return FUNC_SUM_LC;
+ }
+
+ @Override
+ public String getAggrDataTypeName() {
+ return DATATYPE_SUM_LC;
+ }
+
+ @Override
+ public Class<? extends DataTypeSerializer<SumLCCounter>> getAggrDataTypeSerializer() {
+ return SumLCSerializer.class;
+ }
+ }
+
+ /**
+ * This class is used for registering sum_lc to calcite schema, no need to implement the functions
+ */
+ public static class SumLCAggFunc {
+
+ public static SumLCCounter init() {
+ return null;
+ }
+
+ public static SumLCCounter add(SumLCCounter cur, Object v, Object r) {
+ return null;
+ }
+
+ public static SumLCCounter merge(SumLCCounter counter0, SumLCCounter counter1) {
+ return null;
+ }
+
+ public static Object result(SumLCCounter counter) {
+ return null;
+ }
+ }
+
+ public static class SumLCSerializer extends DataTypeSerializer<SumLCCounter> {
+
+ public SumLCSerializer(DataType dataType) {
+
+ }
+
+ @Override
+ public void serialize(SumLCCounter value, ByteBuffer out) {
+ // left over issue, default implementation ignored
+ }
+
+ @Override
+ public SumLCCounter deserialize(ByteBuffer in) {
+ return null;
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ return 0;
+ }
+
+ @Override
+ public int maxLength() {
+ return 0;
+ }
+
+ @Override
+ public int getStorageBytesEstimate() {
+ return 0;
+ }
+ }
+}
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
index f9ef1bc103..ce7081c1bc 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
@@ -171,9 +171,9 @@ public class DataType implements Serializable {
}
public static DataType getType(String type) {
- if (type == null)
+ if (type == null) {
return null;
-
+ }
DataType dataType = new DataType(type);
DataType cached = CACHE.get(dataType);
if (cached == null) {
@@ -409,10 +409,13 @@ public class DataType implements Serializable {
public static final BytesSerializer<DataType> serializer = new BytesSerializer<DataType>() {
@Override
public void serialize(DataType value, ByteBuffer out) {
+ serializeDataType(value, out);
+ }
+
+ private void serializeDataType(DataType value, ByteBuffer out) {
BytesUtil.writeUTFString(value.name, out);
BytesUtil.writeVInt(value.precision, out);
BytesUtil.writeVInt(value.scale, out);
-
}
@Override
@@ -420,7 +423,6 @@ public class DataType implements Serializable {
String name = BytesUtil.readUTFString(in);
int precision = BytesUtil.readVInt(in);
int scale = BytesUtil.readVInt(in);
-
return new DataType(name, precision, scale);
}
};
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index c66ed0ce8e..b2a212f84f 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -43,6 +43,7 @@ import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.directory.api.util.Strings;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.measure.MeasureTypeFactory;
@@ -55,6 +56,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -91,21 +93,31 @@ public class FunctionDesc implements Serializable {
public static String proposeReturnType(String expression, String colDataType, Map<String, String> override,
boolean saveCheck) {
- String returnType = override.getOrDefault(expression,
- EXPRESSION_DEFAULT_TYPE_MAP.getOrDefault(expression, colDataType));
- if (saveCheck && colDataType != null && DataType.getType(colDataType).isStringFamily()) {
+ if (saveCheck) {
switch (expression) {
case FunctionDesc.FUNC_SUM:
- case FunctionDesc.FUNC_PERCENTILE:
- throw new KylinException(INVALID_MEASURE_DATA_TYPE,
- String.format(Locale.ROOT, "Invalid column type %s for measure %s", colDataType, expression));
+ case FunctionDesc.FUNC_PERCENTILE: {
+ if (colDataType != null && DataType.getType(colDataType).isStringFamily()) {
+ throw new KylinException(INVALID_MEASURE_DATA_TYPE, String.format(Locale.ROOT,
+ "Invalid column type %s for measure %s", colDataType, expression));
+ }
+ break;
+ }
+ case FunctionDesc.FUNC_SUM_LC: {
+ Preconditions.checkArgument(Strings.isNotEmpty(colDataType),
+ "SUM_LC Measure's input type shouldn't be null or empty");
+ checkSumLCDataType(colDataType);
+ break;
+ }
default:
break;
}
}
- switch (expression) {
- case FunctionDesc.FUNC_SUM:
+ String returnType = override.getOrDefault(expression,
+ EXPRESSION_DEFAULT_TYPE_MAP.getOrDefault(expression, colDataType));
+ // widen return type for sum or sum_lc measure
+ if (FunctionDesc.FUNC_SUM.equals(expression) || FunctionDesc.FUNC_SUM_LC.equals(expression)) {
if (colDataType != null) {
DataType type = DataType.getType(returnType);
if (type.isIntegerFamily()) {
@@ -119,13 +131,19 @@ public class FunctionDesc implements Serializable {
} else {
returnType = "decimal(19,4)";
}
- break;
- default:
- break;
}
return returnType;
}
+ private static void checkSumLCDataType(String dataTypeName) {
+ DataType dataType = DataType.getType(dataTypeName);
+ if (!dataType.isNumberFamily()) {
+ throw new KylinException(INVALID_MEASURE_DATA_TYPE,
+ String.format(Locale.ROOT, "SUM_LC Measure's return type '%s' is illegal. It must be one of %s",
+ dataType, DataType.NUMBER_FAMILY));
+ }
+ }
+
public static final String FUNC_SUM = "SUM";
public static final String FUNC_MIN = "MIN";
public static final String FUNC_MAX = "MAX";
@@ -147,6 +165,7 @@ public class FunctionDesc implements Serializable {
public static final String FUNC_PERCENTILE = "PERCENTILE_APPROX";
public static final String FUNC_GROUPING = "GROUPING";
public static final String FUNC_TOP_N = "TOP_N";
+ public static final String FUNC_SUM_LC = "SUM_LC";
public static final ImmutableSet<String> DIMENSION_AS_MEASURES = ImmutableSet.<String> builder()
.add(FUNC_MAX, FUNC_MIN, FUNC_COUNT_DISTINCT).build();
public static final ImmutableSet<String> NOT_SUPPORTED_FUNCTION = ImmutableSet.<String> builder().build();
@@ -195,19 +214,30 @@ public class FunctionDesc implements Serializable {
for (ParameterDesc p : getParameters()) {
if (p.isColumnType()) {
TblColRef colRef = model.findColumn(p.getValue());
- returnDataType = DataType.getType(
- proposeReturnType(expression, colRef.getDatatype(), Maps.newHashMap(), model.isSaveCheck()));
p.setValue(colRef.getIdentity());
p.setColRef(colRef);
+ if (expression.equals(FUNC_SUM_LC)) {
+ if (Objects.isNull(returnDataType)) {
+ // use the first column to init returnType and returnDataType, ignore the second timestamp column
+ returnType = proposeReturnType(expression, colRef.getDatatype(), Maps.newHashMap(),
+ model.isSaveCheck());
+ returnDataType = DataType.getType(returnType);
+ }
+ } else {
+ returnDataType = DataType.getType(proposeReturnType(expression, colRef.getDatatype(),
+ Maps.newHashMap(), model.isSaveCheck()));
+ }
}
}
- if (returnDataType == null) {
- returnDataType = DataType.getType(BIGINT);
- }
- if (!StringUtils.isEmpty(returnType)) {
- returnDataType = DataType.getType(returnType);
+ if (!expression.equals(FUNC_SUM_LC)) {
+ if (returnDataType == null) {
+ returnDataType = DataType.getType(BIGINT);
+ }
+ if (!StringUtils.isEmpty(returnType)) {
+ returnDataType = DataType.getType(returnType);
+ }
+ returnType = returnDataType.toString();
}
- returnType = returnDataType.toString();
}
private void reInitMeasureType() {
diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/util/FunctionDescTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/util/FunctionDescTest.java
index 226b8ac3b7..77ed4f6ecd 100644
--- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/util/FunctionDescTest.java
+++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/util/FunctionDescTest.java
@@ -26,14 +26,14 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.model.ParameterDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
-import org.apache.kylin.metadata.model.NDataModel;
-import org.apache.kylin.metadata.model.NDataModelManager;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -86,6 +86,12 @@ public class FunctionDescTest extends NLocalFileMetadataTestCase {
Assert.fail();
} catch (KylinException ignored) {
}
+ Assert.assertThrows(IllegalArgumentException.class,
+ () -> FunctionDesc.proposeReturnType("SUM_LC", "", Maps.newHashMap(), true));
+ Assert.assertThrows(KylinException.class,
+ () -> FunctionDesc.proposeReturnType("SUM_LC", "char", Maps.newHashMap(), true));
+ String returnType = FunctionDesc.proposeReturnType("SUM_LC", "bigint", Maps.newHashMap(), true);
+ Assert.assertEquals("bigint", returnType);
}
@Test
diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/project/NProjectManagerTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/project/NProjectManagerTest.java
index d87805c140..0d2e6cbaf3 100644
--- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/project/NProjectManagerTest.java
+++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/project/NProjectManagerTest.java
@@ -26,10 +26,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
-import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.hystrix.NCircuitBreaker;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
-import org.apache.kylin.metadata.project.ProjectInstance;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -69,7 +68,7 @@ public class NProjectManagerTest extends NLocalFileMetadataTestCase {
}
val projects = projectManager.listAllProjects();
- Assert.assertEquals(27, projects.size());
+ Assert.assertEquals(28, projects.size());
Assert.assertTrue(projects.stream().noneMatch(p -> p.getName().equals("test")));
}
diff --git a/src/examples/test_case_data/localmeta/metadata/_global/project/sum_lc.json b/src/examples/test_case_data/localmeta/metadata/_global/project/sum_lc.json
new file mode 100644
index 0000000000..2fdd47af4f
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/_global/project/sum_lc.json
@@ -0,0 +1,35 @@
+{
+ "uuid" : "d1ceb3a4-4d2c-27af-5b7d-92c8a4055776",
+ "last_modified" : 1667464991353,
+ "create_time" : 1667464991347,
+ "version" : "4.0.0.0",
+ "name" : "sum_lc",
+ "owner" : "ADMIN",
+ "status" : "ENABLED",
+ "create_time_utc" : 1667464991347,
+ "default_database" : "DEFAULT",
+ "description" : "",
+ "principal" : null,
+ "keytab" : null,
+ "maintain_model_type" : "MANUAL_MAINTAIN",
+ "override_kylin_properties" : {
+ "kylin.metadata.semi-automatic-mode" : "false",
+ "kylin.query.metadata.expose-computed-column" : "true",
+ "kylin.source.default" : "9"
+ },
+ "segment_config" : {
+ "auto_merge_enabled" : false,
+ "auto_merge_time_ranges" : [ "WEEK", "MONTH", "QUARTER", "YEAR" ],
+ "volatile_range" : {
+ "volatile_range_number" : 0,
+ "volatile_range_enabled" : false,
+ "volatile_range_type" : "DAY"
+ },
+ "retention_range" : {
+ "retention_range_number" : 1,
+ "retention_range_enabled" : false,
+ "retention_range_type" : "MONTH"
+ },
+ "create_empty_segment_enabled" : false
+ }
+}
\ No newline at end of file
diff --git a/src/examples/test_case_data/localmeta/metadata/sum_lc/dataflow/f35f2937-9e4d-347a-7465-d64df939e7d6.json b/src/examples/test_case_data/localmeta/metadata/sum_lc/dataflow/f35f2937-9e4d-347a-7465-d64df939e7d6.json
new file mode 100644
index 0000000000..f5f39f4889
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/sum_lc/dataflow/f35f2937-9e4d-347a-7465-d64df939e7d6.json
@@ -0,0 +1,13 @@
+{
+ "uuid" : "f35f2937-9e4d-347a-7465-d64df939e7d6",
+ "last_modified" : 1667467391143,
+ "create_time" : 1667465241650,
+ "version" : "4.0.0.0",
+ "status" : "ONLINE",
+ "last_status" : null,
+ "cost" : 50,
+ "query_hit_count" : 0,
+ "last_query_time" : 0,
+ "layout_query_hit_count" : { },
+ "segments" : [ ]
+}
\ No newline at end of file
diff --git a/src/examples/test_case_data/localmeta/metadata/sum_lc/index_plan/f35f2937-9e4d-347a-7465-d64df939e7d6.json b/src/examples/test_case_data/localmeta/metadata/sum_lc/index_plan/f35f2937-9e4d-347a-7465-d64df939e7d6.json
new file mode 100644
index 0000000000..9b29b1fd06
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/sum_lc/index_plan/f35f2937-9e4d-347a-7465-d64df939e7d6.json
@@ -0,0 +1,43 @@
+{
+ "uuid" : "f35f2937-9e4d-347a-7465-d64df939e7d6",
+ "last_modified" : 1667467391117,
+ "create_time" : 1667293061109,
+ "version" : "4.0.0.0",
+ "description" : null,
+ "rule_based_index" : {
+ "dimensions" : [ 1, 6 ],
+ "measures" : [ 100000, 100001, 100002, 100003, 100004, 100005, 100006, 100007, 100008 ],
+ "global_dim_cap" : null,
+ "aggregation_groups" : [ {
+ "includes" : [ 1, 6 ],
+ "measures" : [ 100000, 100001, 100002, 100003, 100004, 100005, 100006, 100007, 100008 ],
+ "select_rule" : {
+ "hierarchy_dims" : [ ],
+ "mandatory_dims" : [ ],
+ "joint_dims" : [ ]
+ },
+ "index_range" : "EMPTY"
+ } ],
+ "layout_id_mapping" : [ 30001, 40001, 50001 ],
+ "parent_forward" : 3,
+ "index_start_id" : 30000,
+ "last_modify_time" : 1667370467484,
+ "layout_black_list" : [ ],
+ "scheduler_version" : 2,
+ "index_update_enabled" : true,
+ "base_layout_enabled" : true
+ },
+ "indexes" : [ ],
+ "override_properties" : { },
+ "to_be_deleted_indexes" : [ ],
+ "auto_merge_time_ranges" : null,
+ "retention_range" : 0,
+ "engine_type" : 80,
+ "next_aggregation_index_id" : 60000,
+ "next_table_index_id" : 20000000000,
+ "agg_shard_by_columns" : [ ],
+ "extend_partition_columns" : [ ],
+ "layout_bucket_num" : { },
+ "approved_additional_recs" : 0,
+ "approved_removal_recs" : 0
+}
\ No newline at end of file
diff --git a/src/examples/test_case_data/localmeta/metadata/sum_lc/model_desc/f35f2937-9e4d-347a-7465-d64df939e7d6.json b/src/examples/test_case_data/localmeta/metadata/sum_lc/model_desc/f35f2937-9e4d-347a-7465-d64df939e7d6.json
new file mode 100644
index 0000000000..ddc2b999de
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/sum_lc/model_desc/f35f2937-9e4d-347a-7465-d64df939e7d6.json
@@ -0,0 +1,267 @@
+{
+ "uuid" : "f35f2937-9e4d-347a-7465-d64df939e7d6",
+ "last_modified" : 1667465241644,
+ "create_time" : 1667293060578,
+ "version" : "4.0.0.0",
+ "alias" : "sum_lc_multi_data_type_test",
+ "owner" : "ADMIN",
+ "config_last_modifier" : null,
+ "config_last_modified" : 0,
+ "description" : null,
+ "fact_table" : "SSB.SUMLC_EXTEND_4X",
+ "fact_table_alias" : null,
+ "management_type" : "MODEL_BASED",
+ "join_tables" : [ ],
+ "filter_condition" : "",
+ "partition_desc" : {
+ "partition_date_column" : "SUMLC_EXTEND_4X.TX_DATE",
+ "partition_date_start" : 0,
+ "partition_date_format" : "yyyy-MM-dd",
+ "partition_type" : "APPEND",
+ "partition_condition_builder" : "org.apache.kylin.metadata.model.PartitionDesc$DefaultPartitionConditionBuilder"
+ },
+ "capacity" : "MEDIUM",
+ "segment_config" : {
+ "auto_merge_enabled" : null,
+ "auto_merge_time_ranges" : null,
+ "volatile_range" : null,
+ "retention_range" : null,
+ "create_empty_segment_enabled" : false
+ },
+ "data_check_desc" : null,
+ "semantic_version" : 0,
+ "storage_type" : 0,
+ "model_type" : "BATCH",
+ "all_named_columns" : [ {
+ "id" : 0,
+ "name" : "INT_DATA",
+ "column" : "SUMLC_EXTEND_4X.INT_DATA"
+ }, {
+ "id" : 1,
+ "name" : "ACCOUNT",
+ "column" : "SUMLC_EXTEND_4X.ACCOUNT",
+ "status" : "DIMENSION"
+ }, {
+ "id" : 2,
+ "name" : "TINYINT_DATA",
+ "column" : "SUMLC_EXTEND_4X.TINYINT_DATA"
+ }, {
+ "id" : 3,
+ "name" : "DOUBLE_DATA",
+ "column" : "SUMLC_EXTEND_4X.DOUBLE_DATA"
+ }, {
+ "id" : 4,
+ "name" : "SMALLINT_DATA",
+ "column" : "SUMLC_EXTEND_4X.SMALLINT_DATA"
+ }, {
+ "id" : 5,
+ "name" : "DECIMAL_DATA",
+ "column" : "SUMLC_EXTEND_4X.DECIMAL_DATA"
+ }, {
+ "id" : 6,
+ "name" : "TX_DATE",
+ "column" : "SUMLC_EXTEND_4X.TX_DATE",
+ "status" : "DIMENSION"
+ }, {
+ "id" : 7,
+ "name" : "MILLIS",
+ "column" : "SUMLC_EXTEND_4X.MILLIS"
+ }, {
+ "id" : 8,
+ "name" : "FLOAT_DATA",
+ "column" : "SUMLC_EXTEND_4X.FLOAT_DATA"
+ }, {
+ "id" : 9,
+ "name" : "BIGINT_DATA",
+ "column" : "SUMLC_EXTEND_4X.BIGINT_DATA"
+ }, {
+ "id" : 10,
+ "name" : "TINYINT_CC",
+ "column" : "SUMLC_EXTEND_4X.TINYINT_CC"
+ } ],
+ "all_measures" : [ {
+ "name" : "COUNT_ALL",
+ "function" : {
+ "expression" : "COUNT",
+ "parameters" : [ {
+ "type" : "constant",
+ "value" : "1"
+ } ],
+ "returntype" : "bigint"
+ },
+ "column" : null,
+ "comment" : null,
+ "id" : 100000,
+ "type" : "NORMAL",
+ "internal_ids" : [ ]
+ }, {
+ "name" : "sumlc_decimal_tx_date",
+ "function" : {
+ "expression" : "SUM_LC",
+ "parameters" : [ {
+ "type" : "column",
+ "value" : "SUMLC_EXTEND_4X.DECIMAL_DATA"
+ }, {
+ "type" : "column",
+ "value" : "SUMLC_EXTEND_4X.TX_DATE"
+ } ],
+ "returntype" : "decimal(19,7)"
+ },
+ "column" : null,
+ "comment" : "",
+ "id" : 100001,
+ "type" : "NORMAL",
+ "internal_ids" : [ ]
+ }, {
+ "name" : "sumlc_tinyint_tx_date",
+ "function" : {
+ "expression" : "SUM_LC",
+ "parameters" : [ {
+ "type" : "column",
+ "value" : "SUMLC_EXTEND_4X.TINYINT_DATA"
+ }, {
+ "type" : "column",
+ "value" : "SUMLC_EXTEND_4X.TX_DATE"
+ } ],
+ "returntype" : "bigint"
+ },
+ "column" : null,
+ "comment" : "",
+ "id" : 100002,
+ "type" : "NORMAL",
+ "internal_ids" : [ ]
+ }, {
+ "name" : "sumlc_smallint_tx_date",
+ "function" : {
+ "expression" : "SUM_LC",
+ "parameters" : [ {
+ "type" : "column",
+ "value" : "SUMLC_EXTEND_4X.SMALLINT_DATA"
+ }, {
+ "type" : "column",
+ "value" : "SUMLC_EXTEND_4X.TX_DATE"
+ } ],
+ "returntype" : "bigint"
+ },
+ "column" : null,
+ "comment" : "",
+ "id" : 100003,
+ "type" : "NORMAL",
+ "internal_ids" : [ ]
+ }, {
+ "name" : "sumlc_int_tx_date",
+ "function" : {
+ "expression" : "SUM_LC",
+ "parameters" : [ {
+ "type" : "column",
+ "value" : "SUMLC_EXTEND_4X.INT_DATA"
+ }, {
+ "type" : "column",
+ "value" : "SUMLC_EXTEND_4X.TX_DATE"
+ } ],
+ "returntype" : "bigint"
+ },
+ "column" : null,
+ "comment" : "",
+ "id" : 100004,
+ "type" : "NORMAL",
+ "internal_ids" : [ ]
+ }, {
+ "name" : "sumlc_bigint_tx_date",
+ "function" : {
+ "expression" : "SUM_LC",
+ "parameters" : [ {
+ "type" : "column",
+ "value" : "SUMLC_EXTEND_4X.BIGINT_DATA"
+ }, {
+ "type" : "column",
+ "value" : "SUMLC_EXTEND_4X.TX_DATE"
+ } ],
+ "returntype" : "bigint"
+ },
+ "column" : null,
+ "comment" : "",
+ "id" : 100005,
+ "type" : "NORMAL",
+ "internal_ids" : [ ]
+ }, {
+ "name" : "sumlc_float_tx_date",
+ "function" : {
+ "expression" : "SUM_LC",
+ "parameters" : [ {
+ "type" : "column",
+ "value" : "SUMLC_EXTEND_4X.FLOAT_DATA"
+ }, {
+ "type" : "column",
+ "value" : "SUMLC_EXTEND_4X.TX_DATE"
+ } ],
+ "returntype" : "double"
+ },
+ "column" : null,
+ "comment" : "",
+ "id" : 100006,
+ "type" : "NORMAL",
+ "internal_ids" : [ ]
+ }, {
+ "name" : "sumlc_double_tx_date",
+ "function" : {
+ "expression" : "SUM_LC",
+ "parameters" : [ {
+ "type" : "column",
+ "value" : "SUMLC_EXTEND_4X.DOUBLE_DATA"
+ }, {
+ "type" : "column",
+ "value" : "SUMLC_EXTEND_4X.TX_DATE"
+ } ],
+ "returntype" : "double"
+ },
+ "column" : null,
+ "comment" : "",
+ "id" : 100007,
+ "type" : "NORMAL",
+ "internal_ids" : [ ]
+ }, {
+ "name" : "sumlc_tinyint_cc_tx_date",
+ "function" : {
+ "expression" : "SUM_LC",
+ "parameters" : [ {
+ "type" : "column",
+ "value" : "SUMLC_EXTEND_4X.TINYINT_CC"
+ }, {
+ "type" : "column",
+ "value" : "SUMLC_EXTEND_4X.TX_DATE"
+ } ],
+ "returntype" : "bigint"
+ },
+ "column" : null,
+ "comment" : "",
+ "id" : 100008,
+ "type" : "NORMAL",
+ "internal_ids" : [ ]
+ } ],
+ "recommendations_count" : 0,
+ "computed_columns" : [ {
+ "tableIdentity" : "SSB.SUMLC_EXTEND_4X",
+ "tableAlias" : "SUMLC_EXTEND_4X",
+ "columnName" : "TINYINT_CC",
+ "expression" : "SUMLC_EXTEND_4X.TINYINT_DATA * 2",
+ "innerExpression" : "`SUMLC_EXTEND_4X`.`TINYINT_DATA` * 2",
+ "datatype" : "INTEGER",
+ "comment" : null,
+ "rec_uuid" : null
+ } ],
+ "canvas" : {
+ "coordinate" : {
+ "SUMLC_EXTEND_4X" : {
+ "x" : 307.9999966091586,
+ "y" : 27.444441053602443,
+ "width" : 200.0,
+ "height" : 522.2222222222221
+ }
+ },
+ "zoom" : 9.0
+ },
+ "multi_partition_desc" : null,
+ "multi_partition_key_mapping" : null,
+ "fusion_id" : null
+}
\ No newline at end of file
diff --git a/src/examples/test_case_data/localmeta/metadata/sum_lc/table/SSB.SUMLC_EXTEND_4X.json b/src/examples/test_case_data/localmeta/metadata/sum_lc/table/SSB.SUMLC_EXTEND_4X.json
new file mode 100644
index 0000000000..85880c9a43
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/sum_lc/table/SSB.SUMLC_EXTEND_4X.json
@@ -0,0 +1,78 @@
+{
+ "uuid" : "5c5334e5-44ec-d8a3-0357-a63d84b56b06",
+ "last_modified" : 0,
+ "create_time" : 1667465028069,
+ "version" : "4.0.0.0",
+ "name" : "SUMLC_EXTEND_4X",
+ "columns" : [ {
+ "id" : "1",
+ "name" : "TX_DATE",
+ "datatype" : "date",
+ "case_sensitive_name" : "tx_date"
+ }, {
+ "id" : "2",
+ "name" : "MILLIS",
+ "datatype" : "bigint",
+ "case_sensitive_name" : "millis"
+ }, {
+ "id" : "3",
+ "name" : "ACCOUNT",
+ "datatype" : "varchar(4096)",
+ "case_sensitive_name" : "account"
+ }, {
+ "id" : "4",
+ "name" : "TINYINT_DATA",
+ "datatype" : "tinyint",
+ "case_sensitive_name" : "tinyint_data"
+ }, {
+ "id" : "5",
+ "name" : "SMALLINT_DATA",
+ "datatype" : "smallint",
+ "case_sensitive_name" : "smallint_data"
+ }, {
+ "id" : "6",
+ "name" : "INT_DATA",
+ "datatype" : "integer",
+ "case_sensitive_name" : "int_data"
+ }, {
+ "id" : "7",
+ "name" : "BIGINT_DATA",
+ "datatype" : "bigint",
+ "case_sensitive_name" : "bigint_data"
+ }, {
+ "id" : "8",
+ "name" : "FLOAT_DATA",
+ "datatype" : "double",
+ "case_sensitive_name" : "float_data"
+ }, {
+ "id" : "9",
+ "name" : "DOUBLE_DATA",
+ "datatype" : "double",
+ "case_sensitive_name" : "double_data"
+ }, {
+ "id" : "10",
+ "name" : "DECIMAL_DATA",
+ "datatype" : "decimal(9,7)",
+ "case_sensitive_name" : "decimal_data"
+ } ],
+ "source_type" : 9,
+ "table_type" : "MANAGED",
+ "top" : false,
+ "increment_loading" : false,
+ "last_snapshot_path" : null,
+ "last_snapshot_size" : 0,
+ "snapshot_last_modified" : 0,
+ "query_hit_count" : 0,
+ "partition_column" : null,
+ "snapshot_partitions" : { },
+ "snapshot_partitions_info" : { },
+ "snapshot_total_rows" : 0,
+ "snapshot_partition_col" : null,
+ "selected_snapshot_partition_col" : null,
+ "temp_snapshot_path" : null,
+ "snapshot_has_broken" : false,
+ "database" : "SSB",
+ "transactional" : false,
+ "rangePartition" : false,
+ "partition_desc" : null
+}
\ No newline at end of file
diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
index 61720f727c..34160d76c6 100644
--- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
+++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
@@ -56,6 +56,7 @@ import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
import org.apache.kylin.query.pushdown.PushDownRunnerSparkImpl;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.request.GarbageCleanUpConfigRequest;
@@ -97,7 +98,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.kyligence.kap.clickhouse.MockSecondStorage;
-import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
import lombok.val;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
@@ -235,14 +235,14 @@ public class ProjectServiceTest extends NLocalFileMetadataTestCase {
public void testGetReadableProjects() {
Mockito.doReturn(true).when(aclEvaluate).hasProjectAdminPermission(Mockito.any(ProjectInstance.class));
List<ProjectInstance> projectInstances = projectService.getReadableProjects("", false);
- Assert.assertEquals(27, projectInstances.size());
+ Assert.assertEquals(28, projectInstances.size());
}
@Test
public void testGetAdminProjects() throws Exception {
Mockito.doReturn(true).when(aclEvaluate).hasProjectAdminPermission(Mockito.any(ProjectInstance.class));
List<ProjectInstance> projectInstances = projectService.getAdminProjects();
- Assert.assertEquals(27, projectInstances.size());
+ Assert.assertEquals(28, projectInstances.size());
}
@Test
@@ -256,7 +256,7 @@ public class ProjectServiceTest extends NLocalFileMetadataTestCase {
public void testGetReadableProjectsHasNoPermissionProject() {
Mockito.doReturn(true).when(aclEvaluate).hasProjectAdminPermission(Mockito.any(ProjectInstance.class));
List<ProjectInstance> projectInstances = projectService.getReadableProjects("", false);
- Assert.assertEquals(27, projectInstances.size());
+ Assert.assertEquals(28, projectInstances.size());
}
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapAggregateRel.java b/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapAggregateRel.java
index bb004b174d..73c50d3d64 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapAggregateRel.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapAggregateRel.java
@@ -46,13 +46,13 @@ import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.measure.corr.CorrMeasureType;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate;
import org.apache.kylin.metadata.cube.model.NDataflow;
+import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MultiPartitionDesc;
import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.query.util.ICutContextStrategy;
import com.google.common.collect.ImmutableList;
@@ -65,7 +65,7 @@ import com.google.common.collect.Sets;
public class KapAggregateRel extends OLAPAggregateRel implements KapRel {
protected static final List<String> supportedFunction = Lists.newArrayList("SUM", "MIN", "MAX", "COUNT_DISTINCT",
- "BITMAP_UUID", "PERCENTILE_APPROX", FunctionDesc.FUNC_BITMAP_BUILD);
+ "BITMAP_UUID", "PERCENTILE_APPROX", FunctionDesc.FUNC_BITMAP_BUILD, FunctionDesc.FUNC_SUM_LC);
private ImmutableList<Integer> rewriteGroupKeys; // preserve the ordering of group keys after CC replacement
private List<ImmutableBitSet> rewriteGroupSets; // group sets with cc replaced
List<AggregateCall> aggregateCalls;
@@ -203,8 +203,8 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel {
TblColRef originalColumn = inputColumnRowType.getColumnByIndex(i);
if (null != this.context && this.context.getGroupCCColRewriteMapping().containsKey(originalColumn)) {
groups.add(this.context.getGroupCCColRewriteMapping().get(originalColumn));
- groupKeys
- .add(inputColumnRowType.getIndexByName(this.context.getGroupCCColRewriteMapping().get(originalColumn).getName()));
+ groupKeys.add(inputColumnRowType
+ .getIndexByName(this.context.getGroupCCColRewriteMapping().get(originalColumn).getName()));
} else {
Set<TblColRef> sourceColumns = inputColumnRowType.getSourceColumnsByIndex(i);
groups.addAll(sourceColumns);
@@ -307,7 +307,6 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel {
// rebuild rowType & columnRowType
this.rowType = this.deriveRowType();
this.columnRowType = this.buildColumnRowType();
-
}
private Boolean isExactlyMatched() {
@@ -330,7 +329,8 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel {
return false;
}
- if (!checkAggCall()) return false;
+ if (!checkAggCall())
+ return false;
Set<String> cuboidDimSet = new HashSet<>();
if (getContext() != null && getContext().storageContext.getCandidate() != null) {
cuboidDimSet = getContext().storageContext.getCandidate().getLayoutEntity().getOrderedDimensions().values()
@@ -403,7 +403,7 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel {
private boolean isDimExactlyMatch(Set<String> groupByCols, Set<String> cuboidDimSet) {
return groupByCols.equals(cuboidDimSet) && isSimpleGroupType()
&& (this.context.getInnerGroupByColumns().isEmpty()
- || !this.context.getGroupCCColRewriteMapping().isEmpty());
+ || !this.context.getGroupCCColRewriteMapping().isEmpty());
}
@@ -502,7 +502,8 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel {
}
public boolean isContainCountDistinct() {
- return aggregateCalls.stream().anyMatch(agg -> agg.getAggregation().getKind() == SqlKind.COUNT && agg.isDistinct());
+ return aggregateCalls.stream()
+ .anyMatch(agg -> agg.getAggregation().getKind() == SqlKind.COUNT && agg.isDistinct());
}
}
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index e944266a82..d68329c0a0 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -101,6 +101,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
protected List<AggregateCall> rewriteAggCalls;
protected List<TblColRef> groups;
protected List<FunctionDesc> aggregations;
+
public OLAPAggregateRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, boolean indicator,
ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls)
throws InvalidRelException {
@@ -501,7 +502,6 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
// rebuild aggregate call
return new AggregateCall(newAgg, false, newArgList, fieldType, callName);
-
}
/**
diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java
index a53f9af8e7..c1f23bb5d7 100644
--- a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java
+++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java
@@ -365,7 +365,7 @@ public class QueryHistoryServiceTest extends NLocalFileMetadataTestCase {
// get all tables
tableMap = queryHistoryService.getQueryHistoryTableMap(null);
- Assert.assertEquals(27, tableMap.size());
+ Assert.assertEquals(28, tableMap.size());
// not existing project
try {
diff --git a/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java b/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
index fe1d092ee3..81439609e9 100644
--- a/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
+++ b/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
@@ -50,6 +50,7 @@ import org.apache.kylin.metadata.query.StructField;
import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
import org.apache.kylin.metadata.realization.NoStreamingRealizationFoundException;
import org.apache.kylin.query.engine.data.QueryResult;
+import org.apache.kylin.query.exception.NotSupportedSQLException;
import org.apache.kylin.query.mask.QueryResultMasks;
import org.apache.kylin.query.relnode.OLAPContext;
import org.apache.kylin.query.util.PushDownUtil;
@@ -81,6 +82,7 @@ public class QueryRoutingEngine {
queryParams.setDefaultSchema(queryExec.getDefaultSchemaName());
if (queryParams.isForcedToPushDown()) {
+ checkContainsSumLC(queryParams, null);
return pushDownQuery(null, queryParams);
}
@@ -128,6 +130,7 @@ public class QueryRoutingEngine {
if (cause instanceof SQLException && cause.getCause() instanceof KylinException) {
throw (SQLException) cause;
}
+ checkContainsSumLC(queryParams, e);
if (shouldPushdown(cause, queryParams)) {
return pushDownQuery((SQLException) cause, queryParams);
} else {
@@ -146,6 +149,7 @@ public class QueryRoutingEngine {
}
}
}
+ checkContainsSumLC(queryParams, e);
if (shouldPushdown(e, queryParams)) {
return pushDownQuery(e, queryParams);
} else {
@@ -167,6 +171,17 @@ public class QueryRoutingEngine {
return false;
}
+ private void checkContainsSumLC(QueryParams queryParams, Throwable t) {
+ if (queryParams.getSql().contains("sum_lc")) {
+ String message = "There is no aggregate index to answer this query, sum_lc() function now is not supported by other query engine";
+ if (t != null) {
+ throw new NotSupportedSQLException(message, t);
+ } else {
+ throw new NotSupportedSQLException(message);
+ }
+ }
+ }
+
private boolean shouldPushdown(Throwable e, QueryParams queryParams) {
if (queryParams.isForcedToIndex()) {
return false;
diff --git a/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java b/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java
index 9a1725f1bc..93fe27494f 100644
--- a/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java
+++ b/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java
@@ -17,11 +17,11 @@
*/
package org.apache.kylin.query.engine;
-import static org.apache.kylin.query.util.AsyncQueryUtil.ASYNC_QUERY_JOB_ID_PRE;
import static org.apache.kylin.metadata.cube.model.NBatchConstants.P_DIST_META_URL;
import static org.apache.kylin.metadata.cube.model.NBatchConstants.P_JOB_ID;
import static org.apache.kylin.metadata.cube.model.NBatchConstants.P_QUERY_CONTEXT;
import static org.apache.kylin.metadata.cube.model.NBatchConstants.P_QUERY_PARAMS;
+import static org.apache.kylin.query.util.AsyncQueryUtil.ASYNC_QUERY_JOB_ID_PRE;
import java.io.BufferedReader;
import java.io.IOException;
@@ -45,11 +45,11 @@ import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.common.util.ShellException;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.query.util.QueryParams;
-import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -217,7 +217,7 @@ public class AsyncQueryJobTest extends NLocalFileMetadataTestCase {
rawResourceMap.put(zipEntry.getName(), raw);
}
}
- Assert.assertEquals(83, rawResourceMap.size());
+ Assert.assertEquals(84, rawResourceMap.size());
}
private void testKylinConfig(FileSystem workingFileSystem, FileStatus metaFileStatus) throws IOException {
diff --git a/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java b/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java
index f929d50a39..7599095bf6 100644
--- a/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java
+++ b/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java
@@ -39,6 +39,7 @@ import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.metadata.realization.NoStreamingRealizationFoundException;
import org.apache.kylin.query.QueryExtension;
import org.apache.kylin.query.engine.data.QueryResult;
+import org.apache.kylin.query.exception.NotSupportedSQLException;
import org.apache.kylin.query.util.QueryParams;
import org.apache.kylin.source.adhocquery.PushdownResult;
import org.apache.spark.SparkException;
@@ -304,4 +305,19 @@ public class QueryRoutingEngineTest extends NLocalFileMetadataTestCase {
QueryResult queryResult = queryRoutingEngine.queryWithSqlMassage(queryParams);
Assert.assertEquals(0, queryResult.getSize());
}
+
+ @Test
+ public void testQueryPushDownWithSumLC() {
+ final String sql = "select sum_lc(column, dateColumn) from success_table_2";
+ final String project = "default";
+ KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv();
+ QueryParams queryParams = new QueryParams();
+ queryParams.setProject(project);
+ queryParams.setSql(sql);
+ queryParams.setKylinConfig(kylinconfig);
+ queryParams.setSelect(true);
+ queryParams.setForcedToPushDown(true);
+
+ Assert.assertThrows(NotSupportedSQLException.class, () -> queryRoutingEngine.queryWithSqlMassage(queryParams));
+ }
}
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
index e1f538bad8..f81e6f7d37 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
@@ -20,19 +20,19 @@ package org.apache.kylin.engine.spark.job
import org.apache.kylin.common.KapConfig
import org.apache.kylin.engine.spark.builder.DFBuilderHelper.ENCODE_SUFFIX
+import org.apache.kylin.measure.bitmap.BitmapMeasureType
+import org.apache.kylin.measure.hllc.HLLCMeasureType
import org.apache.kylin.metadata.cube.cuboid.NSpanningTree
import org.apache.kylin.metadata.cube.model.{NCubeJoinedFlatTableDesc, NDataSegment}
import org.apache.kylin.metadata.model.NDataModel.Measure
-import org.apache.kylin.measure.bitmap.BitmapMeasureType
-import org.apache.kylin.measure.hllc.HLLCMeasureType
import org.apache.kylin.metadata.model.TblColRef
-import org.apache.spark.sql.functions.{col, _}
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.udaf._
import org.apache.spark.sql.util.SparderTypeUtil
import org.apache.spark.sql.util.SparderTypeUtil.toSparkType
-import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.{Column, DataFrame}
import java.util
import java.util.Locale
@@ -60,20 +60,20 @@ object CuboidAggregator {
dataset.schema.fieldNames.zipWithIndex.map(tp => (tp._2, tp._1)).toMap
aggregate(dataset, dimensions, measures, //
- (colRef: TblColRef) => columnIndex.apply(flatTableDesc.getColumnIndex(colRef)) )
+ (colRef: TblColRef) => columnIndex.apply(flatTableDesc.getColumnIndex(colRef)))
}
/**
- * Avoid compilation error when invoking aggregate in java
- * incompatible types: Function1 is not a functional interface
- *
- * @param dataset
- * @param dimensions
- * @param measures
- * @param tableDesc
- * @param isSparkSQL
- * @return
- */
+ * Avoid compilation error when invoking aggregate in java
+ * incompatible types: Function1 is not a functional interface
+ *
+ * @param dataset
+ * @param dimensions
+ * @param measures
+ * @param tableDesc
+ * @param isSparkSQL
+ * @return
+ */
def aggregateJava(dataset: DataFrame,
dimensions: util.Set[Integer],
measures: util.Map[Integer, Measure],
@@ -223,6 +223,15 @@ object CuboidAggregator {
}
case "CORR" =>
new Column(Literal(null, DoubleType)).as(measureEntry._1.toString)
+ case "SUM_LC" =>
+ val colDataType = function.getReturnDataType
+ val sparkDataType = toSparkType(colDataType)
+ if (reuseLayout) {
+ new Column(ReuseSumLC(columns.head.expr, sparkDataType).toAggregateExpression()).as(measureEntry._1.toString)
+ } else {
+ new Column(EncodeSumLC(columns.head.expr, columns.drop(1).head.expr, sparkDataType)
+ .toAggregateExpression()).as(measureEntry._1.toString)
+ }
}
}.toSeq
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/AggregatePlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/AggregatePlan.scala
index 8a2682d460..714b42d15d 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/AggregatePlan.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/AggregatePlan.scala
@@ -17,11 +17,11 @@
*/
package org.apache.kylin.query.runtime.plan
-import org.apache.kylin.engine.spark.utils.LogEx
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.rex.RexLiteral
import org.apache.calcite.sql.SqlKind
import org.apache.kylin.common.KylinConfig
+import org.apache.kylin.engine.spark.utils.LogEx
import org.apache.kylin.metadata.model.FunctionDesc
import org.apache.kylin.query.relnode.{KapAggregateRel, KapProjectRel, KylinAggregateCall, OLAPAggregateRel}
import org.apache.kylin.query.util.RuntimeHelper
@@ -43,7 +43,8 @@ import scala.collection.JavaConverters._
// scalastyle:off
object AggregatePlan extends LogEx {
val binaryMeasureType =
- List("PERCENTILE", "PERCENTILE_APPROX", "INTERSECT_COUNT", "COUNT_DISTINCT", "BITMAP_UUID", FunctionDesc.FUNC_BITMAP_BUILD)
+ List("PERCENTILE", "PERCENTILE_APPROX", "INTERSECT_COUNT", "COUNT_DISTINCT", "BITMAP_UUID",
+ FunctionDesc.FUNC_BITMAP_BUILD, FunctionDesc.FUNC_SUM_LC)
def agg(inputs: java.util.List[DataFrame],
rel: KapAggregateRel): DataFrame = logTime("aggregate", debug = true) {
@@ -83,6 +84,10 @@ object AggregatePlan extends LogEx {
case FunctionDesc.FUNC_BITMAP_BUILD =>
val aggName = SchemaProcessor.replaceToAggravateSchemaName(index, "BITMAP_BUILD_DECODE", hash, argNames: _*)
KapFunctions.precise_bitmap_build_decode(columnName.head).alias(aggName)
+ case FunctionDesc.FUNC_SUM_LC =>
+ val aggName = SchemaProcessor.replaceToAggravateSchemaName(index, "SUM_LC_DECODE", hash, argNames: _*)
+ val sparkDataType = SparderTypeUtil.toSparkType(dataType)
+ KapFunctions.k_sum_lc_decode(columnName.head, sparkDataType.json).alias(aggName)
case _ =>
col(schemaNames.apply(call.getArgList.get(0)))
}
@@ -185,6 +190,8 @@ object AggregatePlan extends LogEx {
} else if (funcName.equalsIgnoreCase(FunctionDesc.FUNC_PERCENTILE)) {
require(columnName.size == 2, s"Input columns size ${columnName.size} don't equal to 2.")
KapFunctions.k_percentile(columnName.head, columnName(1), dataType.getPrecision).alias(aggName)
+ } else if (funcName.equalsIgnoreCase(FunctionDesc.FUNC_SUM_LC)) {
+ KapFunctions.k_sum_lc(columnName.head, SparderTypeUtil.toSparkType(dataType)).alias(aggName)
} else {
callUDF(registeredFuncName, columnName.toList: _*).alias(aggName)
}
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KapFunctions.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KapFunctions.scala
index eae6251f1a..7d6d9e4cb2 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KapFunctions.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KapFunctions.scala
@@ -21,13 +21,11 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.ExpressionUtils.expression
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
-import org.apache.spark.sql.catalyst.expressions.{ApproxCountDistinctDecode, CeilDateTime, DictEncode, DictEncodeV3, Expression, ExpressionInfo, FloorDateTime, ImplicitCastInputTypes, In, KapAddMonths, KapSubtractMonths, Like, Literal, PercentileDecode, PreciseCountDistinctDecode, RLike, RoundBase, SplitPart, Sum0, TimestampAdd, TimestampDiff, Truncate}
-import org.apache.spark.sql.types.{ArrayType, BinaryType, DoubleType, LongType, StringType}
import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenerator, CodegenContext, ExprCode}
-import org.apache.spark.sql.catalyst.expressions.{ApproxCountDistinctDecode, CeilDateTime, DictEncode, EmptyRow, Expression, ExpressionInfo, FloorDateTime, ImplicitCastInputTypes, In, KapAddMonths, KapSubtractMonths, Like, Literal, PercentileDecode, PreciseCountDistinctDecode, RLike, RoundBase, SplitPart, Sum0, TimestampAdd, TimestampDiff, Truncate}
-import org.apache.spark.sql.types.{ArrayType, BinaryType, ByteType, DataType, DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType}
-import org.apache.spark.sql.udaf.{ApproxCountDistinct, IntersectCount, Percentile, PreciseBitmapBuildBase64Decode, PreciseBitmapBuildBase64WithIndex, PreciseBitmapBuildPushDown, PreciseCardinality, PreciseCountDistinct, PreciseCountDistinctAndArray, PreciseCountDistinctAndValue, ReusePreciseCountDistinct}
+import org.apache.spark.sql.catalyst.expressions.{ApproxCountDistinctDecode, CeilDateTime, DictEncode, DictEncodeV3, EmptyRow, Expression, ExpressionInfo, FloorDateTime, ImplicitCastInputTypes, In, KapAddMonths, KapSubtractMonths, Like, Literal, PercentileDecode, PreciseCountDistinctDecode, RLike, RoundBase, SplitPart, Sum0, SumLCDecode, TimestampAdd, TimestampDiff, Truncate}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.udaf._
object KapFunctions {
@@ -62,6 +60,12 @@ object KapFunctions {
def in(value: Expression, list: Seq[Expression]): Column = Column(In(value, list))
+ def k_sum_lc(measureCol: Column, wrapDataType: DataType): Column =
+ Column(ReuseSumLC(measureCol.expr, wrapDataType, wrapDataType).toAggregateExpression())
+
+ def k_sum_lc_decode(measureCol: Column, wrapDataType: String): Column =
+ Column(SumLCDecode(measureCol.expr, Literal(wrapDataType)))
+
def k_percentile(head: Column, column: Column, precision: Int): Column =
Column(Percentile(head.expr, precision, Some(column.expr), DoubleType).toAggregateExpression())
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
index ea49dac47e..321bece62c 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
@@ -22,8 +22,8 @@ import org.apache.kylin.measure.hllc.HLLCounter
import org.apache.kylin.measure.percentile.PercentileSerializer
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
-import org.apache.spark.sql.types.Decimal
-import org.apache.spark.sql.udaf.BitmapSerAndDeSerObj
+import org.apache.spark.sql.types.{DataType, Decimal}
+import org.apache.spark.sql.udaf.{BitmapSerAndDeSerObj, SumLCUtil}
import java.nio.ByteBuffer
import scala.reflect.ClassTag
@@ -117,15 +117,15 @@ object ExpressionUtils {
}
def approxCountDistinctDecodeHelper(bytes: Any, precision: Any): Long = {
- val storageFormat = bytes.asInstanceOf[Array[Byte]]
- val preciseValue = precision.asInstanceOf[Int]
- if (storageFormat.nonEmpty) {
- val counter = new HLLCounter(preciseValue)
- counter.readRegisters(ByteBuffer.wrap(storageFormat))
- counter.getCountEstimate
- } else {
- 0L
- }
+ val storageFormat = bytes.asInstanceOf[Array[Byte]]
+ val preciseValue = precision.asInstanceOf[Int]
+ if (storageFormat.nonEmpty) {
+ val counter = new HLLCounter(preciseValue)
+ counter.readRegisters(ByteBuffer.wrap(storageFormat))
+ counter.getCountEstimate
+ } else {
+ 0L
+ }
}
def percentileDecodeHelper(bytes: Any, quantile: Any, precision: Any): Double = {
@@ -134,5 +134,12 @@ object ExpressionUtils {
val counter = serializer.deserialize(ByteBuffer.wrap(arrayBytes))
counter.getResultEstimateWithQuantileRatio(quantile.asInstanceOf[Decimal].toDouble)
}
-}
+ def sumLCDecodeHelper(bytes: Any, wrapDataType: Any): Number = {
+ val arrayBytes = bytes.asInstanceOf[Array[Byte]]
+ val codec = SumLCUtil.getNumericNullSafeSerializerByDataType(DataType.fromJson(wrapDataType.toString))
+ val counter = SumLCUtil.decodeToSumLCCounter(arrayBytes, codec)
+ counter.getSumLC
+ }
+
+}
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala
index 24e25c940f..ea28802be5 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala
@@ -125,7 +125,6 @@ case class KapSubtractMonths(a: Expression, b: Expression)
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.util.TypeUtils
-import org.apache.spark.sql.types._
@ExpressionDescription(
usage = "_FUNC_(expr) - Returns the sum calculated from values of a group. " +
@@ -772,4 +771,65 @@ case class PercentileDecode(bytes: Expression, quantile: Expression, precision:
val newChildren = Seq(newFirst, newSecond, newThird)
super.legacyWithNewChildren(newChildren)
}
-}
\ No newline at end of file
+}
+
+case class SumLCDecode(bytes: Expression, wrapDataTypeExpr: Expression) extends BinaryExpression with ExpectsInputTypes {
+ override def left: Expression = bytes;
+
+ override def right: Expression = wrapDataTypeExpr
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType, StringType)
+
+ def wrapDataType = DataType.fromJson(wrapDataTypeExpr.toString)
+
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+ val leftGen = left.genCode(ctx)
+ val rightGen = right.genCode(ctx)
+ val expressionUtils = ExpressionUtils.getClass.getName.stripSuffix("$")
+ val decimalUtil = classOf[Decimal].getName
+ val evalValue = ctx.freshName("evalValue")
+ val javaType = CodeGenerator.javaType(dataType)
+ val boxedJavaType = CodeGenerator.boxedType(javaType)
+ val commonCodeBlock =
+ code"""
+ ${leftGen.code}
+ ${rightGen.code}
+ Number $evalValue = $expressionUtils.sumLCDecodeHelper(${leftGen.value}, ${rightGen.value});
+ boolean ${ev.isNull} = $evalValue == null;
+ $javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
+ """
+ val conditionCodeBlock = if (wrapDataType.isInstanceOf[DecimalType]) {
+ code"""
+ if(!${ev.isNull}) {
+ ${ev.value} = $decimalUtil.fromDecimal($evalValue);
+ }
+ """
+ } else {
+ code"""
+ if(!${ev.isNull}) {
+ ${ev.value} = ($boxedJavaType) $evalValue;
+ }
+ """
+ }
+ ev.copy(code = commonCodeBlock + conditionCodeBlock)
+ }
+
+ override protected def nullSafeEval(bytes: Any, wrapDataTypeExpr: Any): Any = {
+ val decodeVal = ExpressionUtils.sumLCDecodeHelper(bytes, wrapDataTypeExpr)
+ wrapDataType match {
+ case DecimalType() =>
+ Decimal.fromDecimal(decodeVal.asInstanceOf[java.math.BigDecimal])
+ case _ =>
+ decodeVal
+ }
+ }
+
+ override def dataType: DataType = wrapDataType
+
+ override def prettyName: String = "sum_lc_decode"
+
+ override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): Expression = {
+ val newChildren = Seq(newLeft, newRight)
+ super.legacyWithNewChildren(newChildren)
+ }
+}
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/SparderAggFun.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/SparderAggFun.scala
index 422951291d..1397251368 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/SparderAggFun.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/SparderAggFun.scala
@@ -18,9 +18,6 @@
package org.apache.spark.sql.udf
-import java.nio.ByteBuffer
-import java.util
-
import com.google.common.collect.Maps
import org.apache.kylin.measure.MeasureAggregator
import org.apache.kylin.measure.bitmap.BitmapCounter
@@ -35,6 +32,9 @@ import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAg
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SparderTypeUtil
+import java.nio.ByteBuffer
+import java.util
+
class SparderAggFun(funcName: String, dataTp: KyDataType)
extends UserDefinedAggregateFunction
with Logging {
@@ -89,7 +89,7 @@ class SparderAggFun(funcName: String, dataTp: KyDataType)
byteBuffer = ByteBuffer.allocate(1024 * 1024)
}
- val initVal = if (isCount) {
+ val initVal = if (isCount) {
// return 0 instead of null in case of no input
measureAggregator.reset()
byteBuffer.clear()
@@ -106,6 +106,7 @@ class SparderAggFun(funcName: String, dataTp: KyDataType)
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
merge(buffer, input)
}
+
override def merge(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
measureAggregator.reset()
@@ -167,7 +168,7 @@ class SparderAggFun(funcName: String, dataTp: KyDataType)
case _ => null
}
- ret
+ ret
}
}
diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/LayoutEntityConverter.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/LayoutEntityConverter.scala
index 528c8cb520..429cdfbbe9 100644
--- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/LayoutEntityConverter.scala
+++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/LayoutEntityConverter.scala
@@ -49,11 +49,11 @@ object LayoutEntityConverter {
bucketSpec = Option(bucketSp))
}
- def toSchema() : StructType = {
+ def toSchema(): StructType = {
genCuboidSchemaFromNCuboidLayout(layoutEntity)
}
- def toExactlySchema() : StructType = {
+ def toExactlySchema(): StructType = {
genCuboidSchemaFromNCuboidLayout(layoutEntity, true)
}
}
@@ -122,6 +122,7 @@ object LayoutEntityConverter {
genSparkStructField(i._1.toString, i._2)
}.toSeq ++ measures)
}
+
def genBucketSpec(layoutEntity: LayoutEntity, partitionColumn: Set[String]): Option[BucketSpec] = {
if (layoutEntity.getShardByColumns.isEmpty) {
Option(BucketSpec(layoutEntity.getBucketNum,
@@ -161,6 +162,7 @@ object LayoutEntityConverter {
case "COLLECT_SET" =>
val parameter = function.getParameters.get(0)
ArrayType(SparderTypeUtil.toSparkType(parameter.getColRef.getType))
+ case "SUM_LC" => BinaryType
case _ => SparderTypeUtil.toSparkType(function.getReturnDataType)
}
}
diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/udaf/NullSafeValueSerializer.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/udaf/NullSafeValueSerializer.scala
index 75a035a344..92fd5dd0d5 100644
--- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/udaf/NullSafeValueSerializer.scala
+++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/udaf/NullSafeValueSerializer.scala
@@ -18,11 +18,11 @@
package org.apache.spark.sql.udaf
+import org.apache.spark.unsafe.types.UTF8String
+
import java.io.{DataInput, DataOutput}
import java.nio.charset.StandardCharsets
-import org.apache.spark.unsafe.types.UTF8String
-
@SerialVersionUID(1)
sealed trait NullSafeValueSerializer {
final def serialize(output: DataOutput, value: Any): Unit = {
@@ -150,6 +150,29 @@ class StringSerializer extends NullSafeValueSerializer {
class DecimalSerializer extends NullSafeValueSerializer {
override def serialize0(output: DataOutput, value: Any): Unit = {
val decimal = value.asInstanceOf[BigDecimal]
+ DecimalCodecUtil.encode(decimal, output)
+ }
+
+ override def deSerialize0(input: DataInput, length: Int): Any = {
+ DecimalCodecUtil.decode(input)
+ }
+}
+
+@SerialVersionUID(1)
+class JavaBigDecimalSerializer extends NullSafeValueSerializer {
+ override def serialize0(output: DataOutput, value: Any): Unit = {
+ val decimal = BigDecimal.apply(value.asInstanceOf[java.math.BigDecimal])
+ DecimalCodecUtil.encode(decimal, output);
+ }
+
+ override def deSerialize0(input: DataInput, length: Int): Any = {
+ val decimal = DecimalCodecUtil.decode(input)
+ decimal.asInstanceOf[BigDecimal].bigDecimal
+ }
+}
+
+object DecimalCodecUtil {
+ def encode(decimal: BigDecimal, output: DataOutput): Unit = {
val bytes = decimal.toString().getBytes(StandardCharsets.UTF_8)
output.writeInt(1 + bytes.length)
output.writeByte(decimal.scale)
@@ -157,7 +180,7 @@ class DecimalSerializer extends NullSafeValueSerializer {
output.write(bytes)
}
- override def deSerialize0(input: DataInput, length: Int): Any = {
+ def decode(input: DataInput): Any = {
val scale = input.readByte()
val length = input.readInt()
val bytes = new Array[Byte](length)
@@ -166,4 +189,3 @@ class DecimalSerializer extends NullSafeValueSerializer {
decimal.setScale(scale)
}
}
-
diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/udaf/SumLC.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/udaf/SumLC.scala
new file mode 100644
index 0000000000..bb68841e01
--- /dev/null
+++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/udaf/SumLC.scala
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.udaf
+
+import com.esotericsoftware.kryo.KryoException
+import com.esotericsoftware.kryo.io.{Input, KryoDataInput, KryoDataOutput, Output}
+import org.apache.kylin.common.util.DateFormat
+import org.apache.kylin.measure.sumlc.SumLCCounter
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, TypedImperativeAggregate}
+import org.apache.spark.sql.types._
+
+/**
+ * Build sum_lc measure, has two implements,
+ * for non-reuse sum_lc, two input, one for value column, another for date column
+ * for reuse sum_lc,construct measure from parent layout
+ *
+ * @param wrapDataType return serialize data type
+ * @param outputDataType return calculate data type, see Percentile outputType
+ * @param mutableAggBufferOffset default value
+ * @param inputAggBufferOffset default value
+ */
+sealed abstract class BaseSumLC(wrapDataType: DataType,
+ outputDataType: DataType = BinaryType,
+ mutableAggBufferOffset: Int = 0,
+ inputAggBufferOffset: Int = 0)
+ extends TypedImperativeAggregate[SumLCCounter] with Serializable with Logging {
+ lazy val serializer: NullSafeValueSerializer = SumLCUtil.getNumericNullSafeSerializerByDataType(wrapDataType)
+
+ override def prettyName: String = "sum_lc"
+
+ override def eval(buffer: SumLCCounter): Any = {
+ outputDataType match {
+ case BinaryType =>
+ serialize(buffer)
+ case DecimalType() =>
+ if (buffer.getSumLC != null) {
+ Decimal.fromDecimal(buffer.getSumLC.asInstanceOf[java.math.BigDecimal])
+ } else {
+ Decimal.ZERO
+ }
+ case _ =>
+ buffer.getSumLC
+ }
+ }
+
+ override def createAggregationBuffer(): SumLCCounter = new SumLCCounter()
+
+ override def merge(buffer: SumLCCounter, input: SumLCCounter): SumLCCounter = {
+ SumLCCounter.merge(buffer, input)
+ }
+
+ override def serialize(buffer: SumLCCounter): Array[Byte] = {
+ val array: Array[Byte] = new Array[Byte](1024 * 1024)
+ val output: Output = new Output(array)
+ serialize(buffer, array, output)
+ }
+
+ private def serialize(buffer: SumLCCounter, array: Array[Byte], output: Output): Array[Byte] = {
+ try {
+ if (buffer == null) {
+ Array.empty[Byte]
+ } else {
+ output.clear()
+ val out = new KryoDataOutput(output)
+ serializer.serialize(out, buffer.getSumLC)
+ out.writeLong(buffer.getTimestamp)
+ val mark = output.position()
+ output.close()
+ array.slice(0, mark)
+ }
+ } catch {
+ case th: KryoException if th.getMessage.contains("Buffer overflow") =>
+ logWarning(s"Resize buffer size to ${array.length * 2}")
+ val updateArray = new Array[Byte](array.length * 2)
+ output.setBuffer(updateArray)
+ serialize(buffer, updateArray, output)
+ case th =>
+ throw th
+ }
+ }
+
+ override def deserialize(bytes: Array[Byte]): SumLCCounter = {
+ SumLCUtil.decodeToSumLCCounter(bytes, serializer)
+ }
+
+ override def nullable: Boolean = false
+
+ override def dataType: DataType = outputDataType
+}
+
+case class EncodeSumLC(
+ evalCol: Expression,
+ dateCol: Expression,
+ wrapDataType: DataType,
+ outputDataType: DataType = BinaryType,
+ mutableAggBufferOffset: Int = 0,
+ inputAggBufferOffset: Int = 0)
+ extends BaseSumLC(wrapDataType, outputDataType, mutableAggBufferOffset, inputAggBufferOffset) {
+
+ override def update(buffer: SumLCCounter, input: InternalRow): SumLCCounter = {
+ val columnEvalVal = evalCol.eval(input)
+ val columnVal = columnEvalVal match {
+ case decimal: Decimal =>
+ decimal.toJavaBigDecimal
+ case _ =>
+ columnEvalVal.asInstanceOf[Number]
+ }
+ val dateValStr = String.valueOf(dateCol.eval(input)).trim
+ val timestampVal = DateFormat.stringToMillis(dateValStr)
+ SumLCCounter.merge(buffer, columnVal, timestampVal)
+ }
+
+ override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate =
+ copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+ override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate =
+ copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+ override def children: Seq[Expression] = Seq(evalCol, dateCol)
+
+ override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression =
+ super.legacyWithNewChildren(newChildren)
+}
+
+case class ReuseSumLC(measure: Expression,
+ wrapDataType: DataType,
+ outputDataType: DataType = BinaryType,
+ mutableAggBufferOffset: Int = 0,
+ inputAggBufferOffset: Int = 0)
+ extends BaseSumLC(wrapDataType, outputDataType, mutableAggBufferOffset, inputAggBufferOffset) {
+
+ override def update(buffer: SumLCCounter, input: InternalRow): SumLCCounter = {
+ val evalCounter = deserialize(measure.eval(input).asInstanceOf[Array[Byte]])
+ SumLCCounter.merge(buffer, evalCounter)
+ }
+
+ override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate =
+ copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+ override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate =
+ copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+ override def children: Seq[Expression] = Seq(measure)
+
+ override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression =
+ super.legacyWithNewChildren(newChildren)
+
+}
+
+object SumLCUtil extends Logging {
+
+ def decodeToSumLCCounter(bytes: Array[Byte], codec: NullSafeValueSerializer): SumLCCounter = {
+ if (bytes.nonEmpty) {
+ val in = new KryoDataInput(new Input(bytes))
+ val sumLC = codec.deserialize(in).asInstanceOf[Number]
+ val timestamp = in.readLong()
+ new SumLCCounter(sumLC, timestamp)
+ } else {
+ new SumLCCounter()
+ }
+ }
+
+ def getNumericNullSafeSerializerByDataType(dataType: org.apache.spark.sql.types.DataType): NullSafeValueSerializer = {
+ dataType match {
+ case LongType => new LongSerializer
+ case DoubleType => new DoubleSerializer
+ case DecimalType() => new JavaBigDecimalSerializer
+ case dt => throw new UnsupportedOperationException("Unsupported sum_lc dimension type: " + dt)
+ }
+ }
+
+}