You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/16 07:01:31 UTC

[kylin] branch kylin5 updated (9501464d6b -> 76d574818f)

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a change to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


    from 9501464d6b Update
     new d7269e7641 fix ts query pushdown after force ts
     new 4b859f88c9 KYLIN-5314 fix bean can not be autowired
     new 0e83e2291a KYLIN-5355  support constant query like `select max(1) from t`
     new 01bac4ea3b KYLIN-5350 Update spark to 3.2.0-kylin-4.6.1.0-SNAPSHOT to fix high risk vulnerability
     new 21c31f4556 KYLIN-5351 minor fix of KYLIN-5333
     new 4ab0273f45 KYLIN-5352 updating healthy model no need set id of the simplified measure
     new 466263216e KYLIN-5353 fix model saving failure when there are CCs of timestampdiff/timestampadd function
     new b7fff5be2a KYLIN-5354 change 'server.max-http-header-size' from 5MB to 32KB
     new cfcf97183e KYLIN-5366 Ignore conf/*.template files when upgrade
     new e23c37715b KYLIN-5365 given an incorrect query queue, check-1700-spark-kystorage.sh prints the detail error message
     new 03c6a0f4ea KYLIN-5364 Support case-insensitive when using table reloading api
     new 28f684b6f9 add connect timeout and ext config from yml
     new 39f87aa8a7 fix query NPE of fusion model
     new 6cb5d889e7 KYLIN-5363 Fix the problem of job metadata change when the number of segments in parallel build is too large
     new 76d574818f KYLIN-5362 Editing agg-groups doesn't produce extra meaningless indexes

The 15 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 build/bin/upgrade.sh                               | 321 +++++++++++++++++++++
 pom.xml                                            |   2 +-
 .../src/main/resources/application.yaml            |   2 +-
 .../org/apache/kylin/common/KylinConfigBase.java   |   1 -
 .../org/apache/kylin/job/dao/NExecutableDao.java   |  25 ++
 .../kylin/job/execution/NExecutableManager.java    |  17 +-
 .../kylin/job/execution/DagExecutableTest.java     |   4 +
 .../kylin/metadata/cube/model/RuleBasedIndex.java  |   2 +-
 .../cube/model/RuleBasedCuboidDescTest.java        |  25 ++
 .../src/main/resources/application.yaml            |   2 +-
 .../kylin/rest/controller/v2/JobControllerV2.java  |   4 +
 .../org/apache/kylin/rest/service/JobService.java  |   1 +
 .../kylin/rest/service/DagJobServiceTest.java      |   4 +
 .../apache/kylin/rest/service/TableService.java    |   4 +-
 .../apache/kylin/rest/service/JobErrorTest.java    |   4 +-
 .../apache/kylin/rest/service/JobServiceTest.java  |  27 +-
 .../org/apache/kylin/rest/service/StageTest.java   |  47 +++
 .../kylin/rest/service/AbstractModelService.java   |   8 +-
 .../kylin/rest/service/ModelSemanticHelper.java    |  87 +++---
 .../service/ModelServiceSemanticUpdateTest.java    | 133 +++++++--
 .../kylin/rest/service/TableReloadServiceTest.java |  96 +++---
 .../src/main/resources/application.yaml            |   2 +-
 .../apache/kylin/query/relnode/OLAPJoinRel.java    |   4 +-
 .../kylin/rest/service/QueryServiceTest.java       |   3 +-
 .../engine/exec/calcite/CalciteQueryPlanExec.java  |   3 -
 .../engine/exec/sparder/SparderQueryPlanExec.java  |  21 +-
 .../kap/newten/clickhouse/ClickHouseUtils.java     |   4 +-
 .../kap/secondstorage/SecondStorageLockTest.java   |  50 +++-
 .../kap/clickhouse/ClickHouseStorage.java          |   7 +
 .../kyligence/kap/clickhouse/job/ClickHouse.java   |  11 +-
 .../management/ClickHouseConfigLoader.java         |   2 +
 .../kap/clickhouse/MockSecondStorage.java          |   2 +
 .../kap/secondstorage/config/ClusterInfo.java      |  24 +-
 src/server/src/main/resources/application.yaml     |   2 +-
 .../kylin/engine/spark/job/NSparkExecutable.java   |   5 +-
 .../engine/spark/NLocalWithSparkSessionTest.java   |  68 +++--
 .../utils/HiveTransactionTableHelperTest.java      |  17 +-
 src/spark-project/sparder/pom.xml                  |   5 +
 .../apache/spark/sql/KylinDataFrameManager.scala   |  31 +-
 .../kylin/query/sql/KylinDataFrameManagerTest.java | 116 ++++++++
 .../apache/kylin/tool/setup/KapGetClusterInfo.java |   9 +-
 .../kylin/tool/setup/YarnResourceInfoTool.java     |   5 +
 42 files changed, 1012 insertions(+), 195 deletions(-)
 create mode 100644 build/bin/upgrade.sh
 create mode 100644 src/spark-project/sparder/src/test/java/org/apache/kylin/query/sql/KylinDataFrameManagerTest.java


[kylin] 09/15: KYLIN-5366 Ignore conf/*.template files when upgrade

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit cfcf97183e7b63c115bf72110daa762be3405fec
Author: Yinghao Lin <39...@users.noreply.github.com>
AuthorDate: Wed Oct 26 14:26:31 2022 +0800

    KYLIN-5366 Ignore conf/*.template files when upgrade
    
    from 39225
    
    Co-authored-by: Yinghao Lin <39...@users.noreply.github.com>
---
 build/bin/upgrade.sh | 321 +++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 321 insertions(+)

diff --git a/build/bin/upgrade.sh b/build/bin/upgrade.sh
new file mode 100644
index 0000000000..36c6ae0017
--- /dev/null
+++ b/build/bin/upgrade.sh
@@ -0,0 +1,321 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+function help() {
+    echo "Usage: upgrade.sh <OLD_KYLIN_HOME> [--silent]"
+    echo
+    echo "<OLD_KYLIN_HOME>    Specify the old version of the Kyligence Enterprise"
+    echo "                    installation directory."
+    echo
+    echo "--silent            Optional, don't enter interactive mode, automatically complete the upgrade."
+    exit 1
+}
+
+function info() {
+    echo -e "\033[32m$@\033[0m"
+}
+
+function warn() {
+    echo -e "\033[33m$@\033[0m"
+}
+
+function error() {
+    echo -e "\033[31m$@\033[0m"
+}
+
+function logging() {
+    case $1 in
+        "info") shift; info $@ ;;
+        "warn") shift; warn $@ ;;
+        "error") shift; error $@ ;;
+        *) echo -e $@ ;;
+    esac
+
+    (echo -e `date '+%F %H:%M:%S'` $@ >> $upgrade_log)
+}
+
+function fail() {
+    error "...................................................[FAIL]"
+    error "Upgrade Kyligence Enterprise failed."
+    recordKylinUpgradeResult "${START_TIME}" "false" "${NEW_KYLIN_HOME}"
+    exit 1
+}
+
+function prompt() {
+    if [[ $silent -eq 0 ]]; then
+        return 0
+    fi
+
+    read -p "$@ (y/n) > " answer
+    if [[ -z $answer ]] || [[ $answer == "y" ]]; then
+        return 0
+    else
+        return 1
+    fi
+}
+
+function check_kylin_query_transformers() {
+    query_transformers=""
+    if [[ -f ${OLD_KYLIN_HOME}/conf/kylin.properties.override ]]; then
+        query_transformers=$(sed -n '/^kylin.query.transformers/p' ${OLD_KYLIN_HOME}/conf/kylin.properties.override)
+    fi
+
+    if [[ -z "${query_transformers}" && -f ${OLD_KYLIN_HOME}/conf/kylin.properties ]]; then
+        query_transformers=$(sed -n '/^kylin.query.transformers/p' ${OLD_KYLIN_HOME}/conf/kylin.properties)
+    fi
+
+    if [[ -n "${query_transformers}" && (! ${query_transformers} =~ io.kyligence.kap.query.security.RowFilter) ]]; then
+          error "Please check the value of the configuration item [kylin.query.transformers] in kylin.properties or kylin.properties.override, which needs to include [org.apache.kylin.query.security.RowFilter] class."
+          exit 1
+    fi
+}
+
+function upgrade() {
+
+    check_kylin_query_transformers
+
+    # needed by km
+    if [[ -f ${OLD_KYLIN_HOME}/pid ]]; then
+        PID=`cat ${OLD_KYLIN_HOME}/pid`
+        if ps -p $PID > /dev/null; then
+          error "Please stop the Kyligence Enterprise during the upgrade process."
+          exit 1
+        fi
+    fi
+
+    if [[ -f ${OLD_KYLIN_HOME}/grafana/pid ]]; then
+        PID=`cat ${OLD_KYLIN_HOME}/grafana/pid`
+        if ps -p $PID > /dev/null; then
+          error "Please stop the Grafana during the upgrade process."
+          exit 1
+        fi
+    fi
+
+    echo `date '+%Y-%m-%d %H:%M:%S '`"INFO : [Operation: upgrade] user:`whoami`, upgrade time:${START_TIME}" >> ${NEW_KYLIN_HOME}/logs/security.log
+    origin_version=$(awk '{print $NF}' ${OLD_KYLIN_HOME}/VERSION)
+    target_version=$(awk '{print $NF}' ${NEW_KYLIN_HOME}/VERSION)
+    echo
+    logging "warn" "Upgrade Kyligence Enterprise from ${origin_version} to ${target_version}"
+    warn "Old KYLIN_HOME is ${OLD_KYLIN_HOME}, log is at ${upgrade_log}"
+    echo
+
+    # copy LICENSE
+    logging "Copy LICENSE"
+    if [[ -f ${OLD_KYLIN_HOME}/LICENSE ]]; then
+        if prompt "'${OLD_KYLIN_HOME}/LICENSE' -> '${NEW_KYLIN_HOME}/'"; then
+            \cp -vf ${OLD_KYLIN_HOME}/LICENSE ${NEW_KYLIN_HOME}/ >> $upgrade_log || fail
+        fi
+    fi
+    info "...................................................[DONE]"
+
+    # copy kylin conf
+    # exclude 'profile*' directory
+    logging "Copy Kylin Conf"
+    for conf_file in $(ls -I "*.template" $OLD_KYLIN_HOME/conf); do
+        if prompt "'${OLD_KYLIN_HOME}/conf/${conf_file}' -> '${NEW_KYLIN_HOME}/conf/'"; then
+            if [[ -d ${OLD_KYLIN_HOME}/conf/${conf_file} ]]; then
+                # silent copy directory
+                \cp -rfv ${OLD_KYLIN_HOME}/conf/${conf_file} ${NEW_KYLIN_HOME}/conf/ >> $upgrade_log || fail
+            else
+                # need to delete the symbolic link first
+                \cp -vf --remove-destination ${OLD_KYLIN_HOME}/conf/${conf_file} ${NEW_KYLIN_HOME}/conf/ >> $upgrade_log || fail
+            fi
+
+        fi
+    done
+    info "...................................................[DONE]"
+
+    # copy ext jars
+    # copy ext/mysql*.jar to spark/jars
+    logging "Copy Ext Jars"
+    for jar_file in $(ls $OLD_KYLIN_HOME/lib/ext); do
+        if prompt "'${OLD_KYLIN_HOME}/lib/ext/${jar_file}' -> '${NEW_KYLIN_HOME}/lib/ext/'"; then
+            \cp -vf ${OLD_KYLIN_HOME}/lib/ext/${jar_file} ${NEW_KYLIN_HOME}/lib/ext/ >> $upgrade_log || fail
+        fi
+
+        if [[ ${jar_file}} == mysql* ]];
+        then
+          if prompt "'${OLD_KYLIN_HOME}/lib/ext/${jar_file}' -> '${NEW_KYLIN_HOME}/spark/jars/'"; then
+            \cp -vf ${OLD_KYLIN_HOME}/lib/ext/${jar_file} ${NEW_KYLIN_HOME}/spark/jars/ >> $upgrade_log || fail
+          fi
+        fi
+    done
+    info "...................................................[DONE]"
+
+    # copy mysql connector jar to spark jars dir for apache hadoop platform
+    APACHE_HADOOP_CONF_DIR=`${NEW_KYLIN_HOME}/bin/get-properties.sh kylin.env.apache-hadoop-conf-dir`
+    if [ -n "${APACHE_HADOOP_CONF_DIR}" ]; then
+      logging "Copy mysql connector jar to spark jars dir for apache hadoop platform"
+      \cp -vf ${OLD_KYLIN_HOME}/lib/ext/mysql-connector-*.jar ${NEW_KYLIN_HOME}/spark/jars/ >> $upgrade_log || fail
+      info "...................................................[DONE]"
+    fi
+
+    # copy the customize directory under old kylin home
+    # such as hadoop_conf
+    logging "Copy Customize Directory"
+    OLDIFS=$IFS
+    IFS=$'\n'
+    for diff_log in $(diff -qr $OLD_KYLIN_HOME $NEW_KYLIN_HOME); do
+        if [[ $diff_log =~ (^Only in ${OLD_KYLIN_HOME}: )(.*) ]]; then
+            diff_file=${BASH_REMATCH[2]}
+            if [[ $diff_file == "meta_backups" || $diff_file == "appid" || $diff_file == "work" ]]; then
+                continue
+            fi
+            if prompt "'${OLD_KYLIN_HOME}/${diff_file}' -> '${NEW_KYLIN_HOME}/'"; then
+                cp -rfv ${OLD_KYLIN_HOME}/${diff_file} ${NEW_KYLIN_HOME}/ >> $upgrade_log || fail
+            fi
+        fi
+    done
+    IFS=$OLDIFS
+    info "...................................................[DONE]"
+
+    # Ensure krb5.conf underlying hadoop_conf if kerberos enabled
+    logging "Copy krb5.conf"
+    if [[ -f ${OLD_KYLIN_HOME}/conf/krb5.conf ]]; then
+        if [[ -d ${NEW_KYLIN_HOME}/hadoop_conf ]]; then
+          cp -rfv ${OLD_KYLIN_HOME}/conf/krb5.conf ${NEW_KYLIN_HOME}/hadoop_conf
+        fi
+
+        if [[ -d ${NEW_KYLIN_HOME}/write_hadoop_conf ]]; then
+          cp -rfv ${OLD_KYLIN_HOME}/conf/krb5.conf ${NEW_KYLIN_HOME}/write_hadoop_conf
+        fi
+    fi
+    info "...................................................[DONE]"
+
+    logging "Copy hive*-site.xml for spark3"
+    if [[ -f ${OLD_KYLIN_HOME}/hadoop_conf/hive-site.xml ]]; then
+        if [[ -d ${NEW_KYLIN_HOME}/hadoop_conf ]]; then
+          cp -rfv ${OLD_KYLIN_HOME}/hadoop_conf/hive-site.xml ${NEW_KYLIN_HOME}/hadoop_conf/hiveserver2-site.xml
+          cp -rfv ${OLD_KYLIN_HOME}/hadoop_conf/hive-site.xml ${NEW_KYLIN_HOME}/hadoop_conf/hivemetastore-site.xml
+        fi
+
+        if [[ -d ${NEW_KYLIN_HOME}/write_hadoop_conf ]]; then
+          cp -rfv ${OLD_KYLIN_HOME}/write_hadoop_conf/hive-site.xml ${NEW_KYLIN_HOME}/write_hadoop_conf/hiveserver2-site.xml
+          cp -rfv ${OLD_KYLIN_HOME}/write_hadoop_conf/hive-site.xml ${NEW_KYLIN_HOME}/write_hadoop_conf/hivemetastore-site.xml
+        fi
+    fi
+    info "...................................................[DONE]"
+
+    # copy spark-env for spark3
+    logging "Copy spark-env for spark3"
+    if [[ -f ${OLD_KYLIN_HOME}/spark/conf/spark-env.sh ]]; then
+        if prompt "'${OLD_KYLIN_HOME}/spark/conf/spark-env.sh' -> '${NEW_KYLIN_HOME}/spark/conf/'"; then
+            \cp -vf ${OLD_KYLIN_HOME}/spark/conf/spark-env.sh ${NEW_KYLIN_HOME}/spark/conf/ >> $upgrade_log || fail
+        fi
+    fi
+    info "...................................................[DONE]"
+
+    # sed -nE 's/^([#\t ]*)(kylin\..*|kap\..*)/\2/p' kylin.properties | awk '{kv[substr($0,0,index($0,"=")-1)]=substr($0,index($0,"=")+1)} END{print kv["kylin.metadata.url"]}'
+    logging "Checking Kylin Conf"
+python <<PY
+from __future__ import print_function
+import os
+import sys
+try:
+    import commands as cmd
+except ImportError:
+    import subprocess as cmd
+
+def printer(msg, *outs):
+    for o in outs: print(msg, file=o)
+
+def getProp(prop_file):
+    if not os.path.exists(prop_file):
+        return dict()
+
+    output = cmd.getoutput("sed -nE 's/^([#\\\\t ]*)(kylin\\..*=.*|kap\\..*=.*)/\\\\2/p' %s" % prop_file)
+    prop = dict()
+    for x in output.split('\n'):
+        if x.strip() == '':
+            continue
+        prop[x[0: x.index('=')]] = x[x.index('=') + 1:]
+    return prop
+
+with open('${upgrade_log}', 'a+') as upgrade_log:
+    origin_prop = getProp('${NEW_KYLIN_HOME}/conf/kylin.properties')
+    prod_prop = dict(getProp('${NEW_KYLIN_HOME}/conf/kylin.properties'), **getProp('${NEW_KYLIN_HOME}/conf/kylin.properties.override'))
+    diffs = set(prod_prop.items()) - set(origin_prop.items())
+
+    def logging(msg):
+        printer(msg, sys.stdout, upgrade_log)
+
+    for diff in diffs:
+        logging(diff)
+PY
+    info "...................................................[DONE]"
+
+    logging "Install"
+    if prompt "'${NEW_KYLIN_HOME}' -> '${OLD_KYLIN_HOME}'"; then
+        install_dir=$(dirname $OLD_KYLIN_HOME)
+        home_name=$(basename $OLD_KYLIN_HOME)
+
+        # backup
+        now=`date '+%Y%m%d%H%M'`
+        backup_file=${home_name}_${now}.tar.gz
+        cd $install_dir && tar -zcvf ${backup_file} ${home_name} >> $upgrade_log || fail
+
+        # install
+        rm -rfv ${OLD_KYLIN_HOME} >> $upgrade_log || fail
+        mv -vf ${NEW_KYLIN_HOME} ${OLD_KYLIN_HOME} >> $upgrade_log || fail
+        info "...................................................[DONE]"
+        recordKylinUpgradeResult "${START_TIME}" "true" "${OLD_KYLIN_HOME}"
+        info "Upgrade finished!"
+        # needed by km
+        info "Backup location:${install_dir}/${backup_file}"
+    else
+        warn "...................................................[SKIP]"
+        recordKylinUpgradeResult "${START_TIME}" "true" "${NEW_KYLIN_HOME}"
+        info "Upgrade aborted because you chose to stop"
+    fi
+
+}
+
+function recordKylinUpgradeResult() {
+    logLevel=`[ "$2" == "true" ] && echo INFO || echo ERROR`
+    echo `date '+%Y-%m-%d %H:%M:%S '`"${logLevel} : [Operation: upgrade result] user:`whoami`, upgrade time:$1, success:$2" >> $3/logs/security.log
+}
+
+NEW_KYLIN_HOME=$(cd `dirname -- $0` && cd ../ && pwd -P)
+silent=1
+while [[ $# != 0 ]]; do
+    if [[ $1 == "--silent" ]]; then
+        silent=0
+    else
+        OLD_KYLIN_HOME=$(cd $1 && pwd)
+    fi
+    shift
+done
+
+if [[ -z $OLD_KYLIN_HOME ]] || [[ ! -d $OLD_KYLIN_HOME ]]; then
+    help
+fi
+
+if [[ $OLD_KYLIN_HOME == $NEW_KYLIN_HOME ]]; then
+    error "Please specify the old version of the Kyligence Enterprise installation directory."
+    help
+fi
+
+mkdir -p ${NEW_KYLIN_HOME}/logs
+upgrade_log=${NEW_KYLIN_HOME}/logs/upgrade-$(date '+%Y_%m_%d_%H_%M_%S').log
+
+set -o errexit
+set -o pipefail
+START_TIME=$(date "+%Y-%m-%d %H:%M:%S")
+upgrade
+


[kylin] 15/15: KYLIN-5362 Editing agg-groups doesn't produce extra meaningless indexes

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 76d574818f142c81ae4a4a6176222241c36f6578
Author: fanshu.kong <17...@qq.com>
AuthorDate: Tue Oct 25 16:44:54 2022 +0800

    KYLIN-5362 Editing agg-groups doesn't produce extra meaningless indexes
    
    from 38573
    
    Co-authored-by: fanshu.kong <17...@qq.com>
---
 .../kylin/metadata/cube/model/RuleBasedIndex.java  |  2 +-
 .../cube/model/RuleBasedCuboidDescTest.java        | 25 ++++++++++++++++++++++
 2 files changed, 26 insertions(+), 1 deletion(-)

diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/RuleBasedIndex.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/RuleBasedIndex.java
index 03c3d128e5..88fe1715a0 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/RuleBasedIndex.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/RuleBasedIndex.java
@@ -528,7 +528,7 @@ public class RuleBasedIndex implements Serializable {
     private void calculateCurrentSortedList(Map<Integer, Integer> mergedAndSortedIndexMap,
             List<Integer> currentSortedList, int dimensionId) {
         boolean needToAppendToTail = true;
-        Set<Integer> currentSortedSet = Sets.newHashSet(currentSortedList);
+        Set<Integer> currentSortedSet = Sets.newLinkedHashSet(currentSortedList);
         if (currentSortedSet.contains(dimensionId)) {
             return;
         }
diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/RuleBasedCuboidDescTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/RuleBasedCuboidDescTest.java
index 7663330e52..404258a670 100644
--- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/RuleBasedCuboidDescTest.java
+++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/RuleBasedCuboidDescTest.java
@@ -28,6 +28,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.cube.model.SelectRule;
 import org.apache.kylin.metadata.cube.CubeTestUtils;
 import org.apache.kylin.metadata.cube.cuboid.NAggregationGroup;
 import org.hamcrest.CoreMatchers;
@@ -35,6 +36,7 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.springframework.test.util.ReflectionTestUtils;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -908,6 +910,29 @@ public class RuleBasedCuboidDescTest extends NLocalFileMetadataTestCase {
         });
     }
 
+    @Test
+    public void testCalculateDimSortedList() throws Exception {
+
+        NAggregationGroup aggregationGroup1 = new NAggregationGroup();
+        aggregationGroup1.setIncludes(new Integer[] { 5, 18 });
+        aggregationGroup1.setMeasures(new Integer[] { 10000 });
+        SelectRule selectRule1 = new SelectRule();
+        selectRule1.setMandatoryDims(new Integer[] {});
+        aggregationGroup1.setSelectRule(selectRule1);
+
+        NAggregationGroup aggregationGroup2 = new NAggregationGroup();
+        aggregationGroup2.setIncludes(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 13, 14, 15, 16, 17, 18 });
+        aggregationGroup2.setMeasures(new Integer[] { 10000 });
+        SelectRule selectRule2 = new SelectRule();
+        selectRule2.setMandatoryDims(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 13, 14, 15, 16, 17, 18 });
+        aggregationGroup2.setSelectRule(selectRule2);
+
+        RuleBasedIndex ruleBasedIndex = new RuleBasedIndex();
+        List<Integer> lists = ReflectionTestUtils.invokeMethod(ruleBasedIndex, "recomputeSortedDimensions",
+                Lists.newArrayList(aggregationGroup1, aggregationGroup2));
+        Assert.assertEquals("[1, 2, 3, 4, 5, 6, 7, 8, 9, 13, 14, 15, 16, 17, 18]", lists.toString());
+    }
+
     private NIndexPlanManager getIndexPlanManager() {
         return NIndexPlanManager.getInstance(getTestConfig(), "default");
     }


[kylin] 03/15: KYLIN-5355 support constant query like `select max(1) from t`

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 0e83e2291af1de9632e822405fed5cd6b8ea3fc0
Author: fanshu.kong <17...@qq.com>
AuthorDate: Mon Oct 17 15:36:38 2022 +0800

    KYLIN-5355  support constant query like `select max(1) from t`
    
    Co-authored-by: fanshu.kong <17...@qq.com>
---
 .../java/org/apache/kylin/query/relnode/OLAPJoinRel.java    |  4 +++-
 .../org/apache/kylin/rest/service/QueryServiceTest.java     |  3 ++-
 .../query/engine/exec/calcite/CalciteQueryPlanExec.java     |  3 ---
 .../query/engine/exec/sparder/SparderQueryPlanExec.java     | 13 ++++++++++++-
 4 files changed, 17 insertions(+), 6 deletions(-)

diff --git a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPJoinRel.java b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPJoinRel.java
index 062669242d..749f8d089c 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPJoinRel.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPJoinRel.java
@@ -303,7 +303,9 @@ public class OLAPJoinRel extends EnumerableJoin implements OLAPRel {
 
         PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray());
         RelOptTable factTable = context.firstTableScan.getTable();
-        MethodCallExpression exprCall = Expressions.call(factTable.getExpression(OLAPTable.class), "executeOLAPQuery",
+        // query result is error like select min(2+2), max(2) from EmptyTable
+        String execFunc = context.isConstantQueryWithAggregations() ? "executeSimpleAggregationQuery" : "executeOLAPQuery";
+        MethodCallExpression exprCall = Expressions.call(factTable.getExpression(OLAPTable.class), execFunc,
                 implementor.getRootExpression(), Expressions.constant(context.id));
         return implementor.result(physType, Blocks.toBlock(exprCall));
     }
diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
index 5011e83b98..3a6f6670da 100644
--- a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
+++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
@@ -1676,7 +1676,8 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase {
     public void testQueryWithConstant() throws SQLException {
         doTestQueryWithConstant("select current_timestamp");
         doTestQueryWithConstant("select 1,2,3,4,5");
-
+        doTestQueryWithConstant("select max(1) from TEST_ACCOUNT inner join TEST_MEASURE "
+                + "on TEST_ACCOUNT.ACCOUNT_ID = TEST_MEASURE.ID1");
     }
 
     private void doTestQueryWithConstant(String testSql) {
diff --git a/src/query/src/main/java/org/apache/kylin/query/engine/exec/calcite/CalciteQueryPlanExec.java b/src/query/src/main/java/org/apache/kylin/query/engine/exec/calcite/CalciteQueryPlanExec.java
index 3a8514e253..3d03f5772e 100644
--- a/src/query/src/main/java/org/apache/kylin/query/engine/exec/calcite/CalciteQueryPlanExec.java
+++ b/src/query/src/main/java/org/apache/kylin/query/engine/exec/calcite/CalciteQueryPlanExec.java
@@ -39,7 +39,6 @@ import org.apache.kylin.common.QueryTrace;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.query.engine.exec.QueryPlanExec;
 import org.apache.kylin.query.engine.meta.MutableDataContext;
-import org.apache.kylin.query.relnode.KapRel;
 
 /**
  * implement and execute a physical plan with Calcite
@@ -51,8 +50,6 @@ public class CalciteQueryPlanExec implements QueryPlanExec {
     public List<List<String>> execute(RelNode rel, MutableDataContext dataContext) {
         QueryContext.currentTrace().startSpan(QueryTrace.EXECUTION);
         initContextVars(dataContext);
-        // allocate the olapContext anyway since it's being checked by some unit tests
-        new KapRel.OLAPContextImplementor().allocateContext((KapRel) rel.getInput(0), rel);
 
         List<List<String>> result = doExecute(rel, dataContext);
 
diff --git a/src/query/src/main/java/org/apache/kylin/query/engine/exec/sparder/SparderQueryPlanExec.java b/src/query/src/main/java/org/apache/kylin/query/engine/exec/sparder/SparderQueryPlanExec.java
index 225c110baf..95b6139c52 100644
--- a/src/query/src/main/java/org/apache/kylin/query/engine/exec/sparder/SparderQueryPlanExec.java
+++ b/src/query/src/main/java/org/apache/kylin/query/engine/exec/sparder/SparderQueryPlanExec.java
@@ -36,6 +36,7 @@ import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate;
 import org.apache.kylin.metadata.cube.model.IndexEntity;
 import org.apache.kylin.query.engine.exec.ExecuteResult;
 import org.apache.kylin.query.engine.exec.QueryPlanExec;
+import org.apache.kylin.query.engine.exec.calcite.CalciteQueryPlanExec;
 import org.apache.kylin.query.engine.meta.MutableDataContext;
 import org.apache.kylin.query.engine.meta.SimpleDataContext;
 import org.apache.kylin.query.relnode.ContextUtil;
@@ -70,12 +71,18 @@ public class SparderQueryPlanExec implements QueryPlanExec {
         // select realizations
         selectRealization(rel);
 
+        val contexts = ContextUtil.listContexts();
+        for (OLAPContext context : contexts) {
+            if (hasEmptyRealization(context)) {
+                return new CalciteQueryPlanExec().executeToIterable(rel, dataContext);
+            }
+        }
+
         // skip if no segment is selected
         // check contentQuery and runConstantQueryLocally for UT cases to make sure SparderEnv.getDF is not null
         // TODO refactor IT tests and remove this runConstantQueryLocally checking
         if (!(dataContext instanceof SimpleDataContext) || !(((SimpleDataContext) dataContext)).isContentQuery()
                 || KapConfig.wrap(((SimpleDataContext) dataContext).getKylinConfig()).runConstantQueryLocally()) {
-            val contexts = ContextUtil.listContexts();
             for (OLAPContext context : contexts) {
                 if (context.olapSchema != null && context.storageContext.isEmptyLayout() && !context.isHasAgg()) {
                     QueryContext.fillEmptyResultSetMetrics();
@@ -110,6 +117,10 @@ public class SparderQueryPlanExec implements QueryPlanExec {
                 && !QueryContext.current().getSecondStorageUsageMap().isEmpty();
     }
 
+    private static boolean hasEmptyRealization(OLAPContext context) {
+        return context.realization == null && context.isConstantQueryWithAggregations();
+    }
+
     protected ExecuteResult internalCompute(QueryEngine queryEngine, DataContext dataContext, RelNode rel) {
         try {
             return queryEngine.computeToIterable(dataContext, rel);


[kylin] 12/15: add connect timeout and ext config from yml

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 28f684b6f9ab8d392fac17675376ab0b5814cd28
Author: Zhixiong Chen <ch...@apache.org>
AuthorDate: Thu Oct 27 15:42:25 2022 +0800

    add connect timeout and ext config from yml
    
    Co-authored-by: chenzhx <ch...@apache.io>
---
 .../kap/newten/clickhouse/ClickHouseUtils.java     |  4 +-
 .../kap/clickhouse/ClickHouseStorage.java          |  7 +++
 .../kyligence/kap/clickhouse/job/ClickHouse.java   | 11 +++-
 .../management/ClickHouseConfigLoader.java         |  2 +
 .../kap/clickhouse/MockSecondStorage.java          |  2 +
 .../kap/secondstorage/config/ClusterInfo.java      | 24 +++++++-
 .../engine/spark/NLocalWithSparkSessionTest.java   | 68 ++++++++++++----------
 .../utils/HiveTransactionTableHelperTest.java      | 17 +++++-
 8 files changed, 96 insertions(+), 39 deletions(-)

diff --git a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/newten/clickhouse/ClickHouseUtils.java b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/newten/clickhouse/ClickHouseUtils.java
index 0ca4863f8c..c6431ad78c 100644
--- a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/newten/clickhouse/ClickHouseUtils.java
+++ b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/newten/clickhouse/ClickHouseUtils.java
@@ -275,7 +275,7 @@ public class ClickHouseUtils {
         int pairNum = clickhouse.length / replica;
         IntStream.range(0, pairNum).forEach(idx -> clusterNode.put("pair" + idx, new ArrayList<>()));
         ClusterInfo cluster = new ClusterInfo().setKeepAliveTimeout("600000").setSocketTimeout("600000")
-                .setCluster(clusterNode);
+                .setConnectTimeout("3000").setExtConfig("maxWait=10").setCluster(clusterNode);
         int i = 0;
         for (JdbcDatabaseContainer<?> jdbcDatabaseContainer : clickhouse) {
             Node node = new Node();
@@ -302,7 +302,7 @@ public class ClickHouseUtils {
         int pairNum = clickhouse.length / replica;
         IntStream.range(0, pairNum).forEach(idx -> clusterNode.put("pair" + idx, new ArrayList<>()));
         ClusterInfo cluster = new ClusterInfo().setKeepAliveTimeout("600000").setSocketTimeout("600000")
-                .setCluster(clusterNode);
+                .setConnectTimeout("3000").setExtConfig("maxWait=10").setCluster(clusterNode);
         int i = 0;
         for (JdbcDatabaseContainer<?> jdbcDatabaseContainer : clickhouse) {
             Node node = new Node();
diff --git a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/ClickHouseStorage.java b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/ClickHouseStorage.java
index 0f4ebbfb83..9e5c578127 100644
--- a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/ClickHouseStorage.java
+++ b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/ClickHouseStorage.java
@@ -178,6 +178,13 @@ public class ClickHouseStorage implements SecondStoragePlugin {
         if (StringUtils.isNotEmpty(cluster.getSocketTimeout())) {
             param.put(ClickHouse.SOCKET_TIMEOUT, cluster.getSocketTimeout());
         }
+        if (StringUtils.isNotEmpty(cluster.getConnectTimeout())) {
+            int timeout = Integer.parseInt(cluster.getConnectTimeout()) / 1000;
+            param.put(ClickHouse.CONNECT_TIMEOUT, Integer.toString(timeout));
+        }
+        if (StringUtils.isNotEmpty(cluster.getExtConfig())) {
+            param.put(ClickHouse.EXT_CONFIG, cluster.getExtConfig());
+        }
         if (StringUtils.isNotEmpty(node.getUser())) {
             param.put(ClickHouse.USER, node.getUser());
         }
diff --git a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickHouse.java b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickHouse.java
index 7a0100c17b..9664bde2e7 100644
--- a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickHouse.java
+++ b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickHouse.java
@@ -52,6 +52,8 @@ public class ClickHouse implements Closeable {
     public static final String USER = "user";
     public static final String SOCKET_TIMEOUT = "socket_timeout";
     public static final String KEEP_ALIVE_TIMEOUT = "keepAliveTimeout";
+    public static final String CONNECT_TIMEOUT = "connect_timeout";
+    public static final String EXT_CONFIG = "extConfig";
     public static final String CLIENT_NAME = "client_name";
 
     private final String shardName;
@@ -110,8 +112,15 @@ public class ClickHouse implements Closeable {
         if (!param.isEmpty()) {
             base.append('?');
             List<String> paramList = new ArrayList<>();
-            param.forEach((name, value) -> paramList.add(name + "=" + value));
+            param.forEach((name, value) -> {
+              if (!ClickHouse.EXT_CONFIG.equals(name)){
+                paramList.add(name + "=" + value);
+              }
+            });
             base.append(String.join("&", paramList));
+            if(param.get(ClickHouse.EXT_CONFIG) != null) {
+                base.append("&").append(param.get(ClickHouse.EXT_CONFIG));
+            }
         }
         return base.toString();
     }
diff --git a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/management/ClickHouseConfigLoader.java b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/management/ClickHouseConfigLoader.java
index a9f2794065..06c4143328 100644
--- a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/management/ClickHouseConfigLoader.java
+++ b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/management/ClickHouseConfigLoader.java
@@ -66,10 +66,12 @@ public class ClickHouseConfigLoader implements SecondStorageConfigLoader {
         clusterDesc.addPropertyParameters("cluster", String.class, List.class);
         clusterDesc.addPropertyParameters("socketTimeout", String.class);
         clusterDesc.addPropertyParameters("keepAliveTimeout", String.class);
+        clusterDesc.addPropertyParameters("connectTimeout", String.class);
         clusterDesc.addPropertyParameters("installPath", String.class);
         clusterDesc.addPropertyParameters("logPath", String.class);
         clusterDesc.addPropertyParameters("userName", String.class);
         clusterDesc.addPropertyParameters("password", String.class);
+        clusterDesc.addPropertyParameters("extConfig", String.class);
         constructor.addTypeDescription(clusterDesc);
         val nodeDesc = new TypeDescription(Node.class);
         nodeDesc.addPropertyParameters("name", String.class);
diff --git a/src/second-storage/clickhouse/src/test/java/io/kyligence/kap/clickhouse/MockSecondStorage.java b/src/second-storage/clickhouse/src/test/java/io/kyligence/kap/clickhouse/MockSecondStorage.java
index 60191478a6..50f58b933a 100644
--- a/src/second-storage/clickhouse/src/test/java/io/kyligence/kap/clickhouse/MockSecondStorage.java
+++ b/src/second-storage/clickhouse/src/test/java/io/kyligence/kap/clickhouse/MockSecondStorage.java
@@ -49,6 +49,7 @@ public class MockSecondStorage {
         ClusterInfo cluster = new ClusterInfo();
         cluster.setKeepAliveTimeout("600000");
         cluster.setSocketTimeout("600000");
+        cluster.setConnectTimeout("3000");
         cluster.setCluster(Collections.emptyMap());
         File file = File.createTempFile("clickhouse", ".yaml");
         ClickHouseConfigLoader.getConfigYaml().dump(JsonUtil.readValue(JsonUtil.writeValueAsString(cluster),
@@ -62,6 +63,7 @@ public class MockSecondStorage {
         ClusterInfo cluster = new ClusterInfo();
         cluster.setKeepAliveTimeout("600000");
         cluster.setSocketTimeout("600000");
+        cluster.setConnectTimeout("3000");
         Map<String, List<Node>> clusterNodes = new HashMap<>();
         cluster.setCluster(clusterNodes);
         val it = nodes.listIterator();
diff --git a/src/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/config/ClusterInfo.java b/src/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/config/ClusterInfo.java
index e57010a1f9..ab29f99695 100644
--- a/src/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/config/ClusterInfo.java
+++ b/src/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/config/ClusterInfo.java
@@ -36,12 +36,14 @@ public class ClusterInfo {
     private Map<String, List<Node>> cluster;
     private String socketTimeout;
     private String keepAliveTimeout;
+    private String connectTimeout;
     private String installPath;
     private String logPath;
 
     //username of machine
     private String userName;
     private String password;
+    private String extConfig;
 
     @JsonIgnore
     public List<Node> getNodes() {
@@ -93,6 +95,24 @@ public class ClusterInfo {
         return keepAliveTimeout;
     }
 
+    public String getConnectTimeout() {
+        return connectTimeout;
+    }
+
+    public ClusterInfo setConnectTimeout(String connectTimeout) {
+        this.connectTimeout = connectTimeout;
+        return this;
+    }
+
+    public String getExtConfig() {
+        return extConfig;
+    }
+
+    public ClusterInfo setExtConfig(String extConfig) {
+        this.extConfig = extConfig;
+        return this;
+    }
+
     public ClusterInfo setKeepAliveTimeout(String keepAliveTimeout) {
         this.keepAliveTimeout = keepAliveTimeout;
         return this;
@@ -135,11 +155,13 @@ public class ClusterInfo {
     public ClusterInfo(ClusterInfo cluster) {
         this.cluster = Maps.newHashMap(cluster.getCluster());
         this.keepAliveTimeout = cluster.getKeepAliveTimeout();
-        this.socketTimeout = cluster.getKeepAliveTimeout();
+        this.socketTimeout = cluster.getSocketTimeout();
+        this.connectTimeout = cluster.getConnectTimeout();
         this.logPath = cluster.getLogPath();
         this.userName = cluster.getUserName();
         this.password = cluster.getPassword();
         this.installPath = cluster.getInstallPath();
+        this.extConfig = cluster.getExtConfig();
     }
 
     public boolean emptyCluster() {
diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java
index 326386b865..df13963a68 100644
--- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java
+++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java
@@ -17,10 +17,15 @@
  */
 package org.apache.kylin.engine.spark;
 
-import com.google.common.base.Preconditions;
-import org.apache.kylin.engine.spark.job.NSparkMergingJob;
-import lombok.extern.slf4j.Slf4j;
-import lombok.val;
+import java.io.File;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.util.Shell;
@@ -28,6 +33,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.common.util.TempMetadataBuilder;
+import org.apache.kylin.engine.spark.job.NSparkMergingJob;
 import org.apache.kylin.engine.spark.merger.AfterMergeOrRefreshResourceMerger;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.ExecutableState;
@@ -60,14 +66,10 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.sparkproject.guava.collect.Sets;
 
-import java.io.File;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Objects;
-import java.util.Random;
-import java.util.Set;
+import com.google.common.base.Preconditions;
+
+import lombok.val;
+import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class NLocalWithSparkSessionTest extends NLocalFileMetadataTestCase implements Serializable {
@@ -129,7 +131,9 @@ public class NLocalWithSparkSessionTest extends NLocalFileMetadataTestCase imple
 
     @AfterClass
     public static void afterClass() {
-        ss.close();
+        if (ss != null) {
+            ss.close();
+        }
         FileUtils.deleteQuietly(new File("../kap-it/metastore_db"));
     }
 
@@ -211,29 +215,29 @@ public class NLocalWithSparkSessionTest extends NLocalFileMetadataTestCase imple
 
         if (type.isIntegerFamily())
             switch (type.getName()) {
-                case "tinyint":
-                    return DataTypes.ByteType;
-                case "smallint":
-                    return DataTypes.ShortType;
-                case "integer":
-                case "int4":
-                    return DataTypes.IntegerType;
-                default:
-                    return DataTypes.LongType;
+            case "tinyint":
+                return DataTypes.ByteType;
+            case "smallint":
+                return DataTypes.ShortType;
+            case "integer":
+            case "int4":
+                return DataTypes.IntegerType;
+            default:
+                return DataTypes.LongType;
             }
 
         if (type.isNumberFamily())
             switch (type.getName()) {
-                case "float":
-                    return DataTypes.FloatType;
-                case "double":
-                    return DataTypes.DoubleType;
-                default:
-                    if (type.getPrecision() == -1 || type.getScale() == -1) {
-                        return DataTypes.createDecimalType(19, 4);
-                    } else {
-                        return DataTypes.createDecimalType(type.getPrecision(), type.getScale());
-                    }
+            case "float":
+                return DataTypes.FloatType;
+            case "double":
+                return DataTypes.DoubleType;
+            default:
+                if (type.getPrecision() == -1 || type.getScale() == -1) {
+                    return DataTypes.createDecimalType(19, 4);
+                } else {
+                    return DataTypes.createDecimalType(type.getPrecision(), type.getScale());
+                }
             }
 
         if (type.isStringFamily())
diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/utils/HiveTransactionTableHelperTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/utils/HiveTransactionTableHelperTest.java
index 185c9fc0ff..dca40c9f68 100644
--- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/utils/HiveTransactionTableHelperTest.java
+++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/utils/HiveTransactionTableHelperTest.java
@@ -37,8 +37,11 @@ import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.spark.sql.SparderEnv;
+import org.apache.spark.sql.SparkSession;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.google.common.collect.Maps;
@@ -51,6 +54,15 @@ public class HiveTransactionTableHelperTest extends NLocalWithSparkSessionTest {
     private final String STORAGE_DFS_DIR = "/test";
     private final String FILED_DELIMITER = "|";
 
+    @BeforeClass
+    public static void beforeClass() {
+        if (SparderEnv.isSparkAvailable()) {
+            SparderEnv.getSparkSession().close();
+        }
+        SparkSession.clearActiveSession();
+        SparkSession.clearDefaultSession();
+    }
+
     @Before
     public void setup() {
         {
@@ -72,9 +84,8 @@ public class HiveTransactionTableHelperTest extends NLocalWithSparkSessionTest {
         SystemPropertiesCache.setProperty("kylin.source.provider.9",
                 "io.kyligence.kap.engine.spark.source.NSparkDataSource");
         SystemPropertiesCache.setProperty("kylin.build.resource.read-transactional-table-enabled", "true");
-        KylinBuildEnv kylinBuildEnv = KylinBuildEnv.getOrCreate(getTestConfig());
-        NTableMetadataManager tableMgr = NTableMetadataManager
-                .getInstance(getTestConfig(), "tdh");
+        KylinBuildEnv kylinBuildEnv = new KylinBuildEnv(getTestConfig());
+        NTableMetadataManager tableMgr = NTableMetadataManager.getInstance(getTestConfig(), "tdh");
         TableDesc fact = tableMgr.getTableDesc("TDH_TEST.LINEORDER_PARTITION");
         fact.setTransactional(true);
         String result = HiveTransactionTableHelper.doGetQueryHiveTemporaryTableSql(fact, Maps.newHashMap(), "LO_ORDERKEY", kylinBuildEnv);


[kylin] 13/15: fix query NPE of fusion model

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 39f87aa8a762f75ddd28f5cd9a3eac13a6391817
Author: binbin.zheng <bi...@kyligence.io>
AuthorDate: Tue Oct 25 19:34:33 2022 +0800

    fix query NPE of fusion model
    
    Co-authored-by: binbin.zheng <bi...@kyligence.io>
---
 src/spark-project/sparder/pom.xml                  |   5 +
 .../apache/spark/sql/KylinDataFrameManager.scala   |  31 +++---
 .../kylin/query/sql/KylinDataFrameManagerTest.java | 116 +++++++++++++++++++++
 3 files changed, 137 insertions(+), 15 deletions(-)

diff --git a/src/spark-project/sparder/pom.xml b/src/spark-project/sparder/pom.xml
index e6f02b8f1d..1cccda51c6 100644
--- a/src/spark-project/sparder/pom.xml
+++ b/src/spark-project/sparder/pom.xml
@@ -128,6 +128,11 @@
             <artifactId>junit-vintage-engine</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala
index d23651eb8f..115e6c9f28 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala
@@ -19,10 +19,11 @@
 package org.apache.spark.sql
 
 import java.sql.Timestamp
-import org.apache.kylin.metadata.cube.model.{LayoutEntity, NDataflow, NDataflowManager}
-import org.apache.kylin.metadata.model.FusionModelManager
+
 import io.kyligence.kap.secondstorage.SecondStorage
 import org.apache.kylin.common.KylinConfig
+import org.apache.kylin.metadata.cube.model.{LayoutEntity, NDataflow, NDataflowManager}
+import org.apache.kylin.metadata.model.FusionModelManager
 import org.apache.spark.sql.datasource.storage.StorageStoreFactory
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types.StructType
@@ -78,26 +79,26 @@ class KylinDataFrameManager(sparkSession: SparkSession) {
     option("pruningInfo", pruningInfo)
     if (dataflow.isStreaming && dataflow.getModel.isFusionModel) {
       val fusionModel = FusionModelManager.getInstance(KylinConfig.getInstanceFromEnv, dataflow.getProject)
-              .getFusionModel(dataflow.getModel.getFusionId)
+        .getFusionModel(dataflow.getModel.getFusionId)
       val batchModelId = fusionModel.getBatchModel.getUuid
       val batchDataflow = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv, dataflow.getProject).getDataflow(batchModelId)
       val end = batchDataflow.getDateRangeEnd
 
       val partition = dataflow.getModel.getPartitionDesc.getPartitionDateColumnRef
       val id = layout.getOrderedDimensions.inverse().get(partition)
-      SecondStorage.trySecondStorage(sparkSession, dataflow, layout, pruningInfo).getOrElse {
-        var df = StorageStoreFactory.create(dataflow.getModel.getStorageType)
-          .read(dataflow, layout, sparkSession, extraOptions.toMap)
-        if (end != Long.MinValue) {
-          df = df.filter(col(id.toString).geq(new Timestamp(end)))
-        }
-        df
-      }
-    } else {
-      SecondStorage.trySecondStorage(sparkSession, dataflow, layout, pruningInfo).getOrElse {
-        StorageStoreFactory.create(dataflow.getModel.getStorageType)
-          .read(dataflow, layout, sparkSession, extraOptions.toMap)
+      var df = read(dataflow, layout, pruningInfo)
+      if (id != null && end != Long.MinValue) {
+        df = df.filter(col(id.toString).geq(new Timestamp(end)))
       }
+      return df
+    }
+    read(dataflow, layout, pruningInfo)
+  }
+
+  def read(dataflow: NDataflow, layout: LayoutEntity, pruningInfo: String): DataFrame = {
+    SecondStorage.trySecondStorage(sparkSession, dataflow, layout, pruningInfo).getOrElse {
+      StorageStoreFactory.create(dataflow.getModel.getStorageType)
+        .read(dataflow, layout, sparkSession, extraOptions.toMap)
     }
   }
 
diff --git a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/sql/KylinDataFrameManagerTest.java b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/sql/KylinDataFrameManagerTest.java
new file mode 100644
index 0000000000..7327c443d0
--- /dev/null
+++ b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/sql/KylinDataFrameManagerTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.query.sql;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.junit.annotation.MetadataInfo;
+import org.apache.kylin.metadata.cube.model.LayoutEntity;
+import org.apache.kylin.metadata.cube.model.NDataflowManager;
+import org.apache.kylin.metadata.model.FusionModelManager;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.model.TimeRange;
+import org.apache.spark.sql.KylinDataFrameManager;
+import org.apache.spark.sql.SparkSession;
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.springframework.util.ReflectionUtils;
+
+import com.google.common.collect.ImmutableBiMap;
+
+import lombok.val;
+import lombok.var;
+
+@MetadataInfo(project = "streaming_test")
+class KylinDataFrameManagerTest {
+
+    @Test
+    void testCuboidTableOfFusionModel() {
+        val ss = SparkSession.builder().appName("local").master("local[1]").getOrCreate();
+        val config = KylinConfig.getInstanceFromEnv();
+        val dataflowManager = NDataflowManager.getInstance(config, "streaming_test");
+        var dataflow = dataflowManager.getDataflow("4965c827-fbb4-4ea1-a744-3f341a3b030d");
+        Assert.assertTrue(dataflow.isStreaming() && dataflow.getModel().isFusionModel());
+
+        val kylinDataFrameManager = Mockito.spy(new KylinDataFrameManager(ss));
+        kylinDataFrameManager.option("isFastBitmapEnabled", "false");
+        {
+            // condition: id != null && end != Long.MinValue
+            val partitionTblCol = dataflow.getModel().getPartitionDesc().getPartitionDateColumnRef();
+            val layoutEntity = Mockito.spy(new LayoutEntity());
+            ImmutableBiMap.Builder<Integer, TblColRef> dimsBuilder = ImmutableBiMap.builder();
+            ImmutableBiMap<Integer, TblColRef> orderedDimensions = dimsBuilder.put(1, partitionTblCol).build();
+            Mockito.when(layoutEntity.getOrderedDimensions()).thenReturn(orderedDimensions);
+            val df = kylinDataFrameManager.cuboidTable(dataflow, layoutEntity, "3e560d22-b749-48c3-9f64-d4230207f120");
+            Assert.assertEquals(1, df.columns().length);
+        }
+        {
+            // condition: id == null
+            val df = kylinDataFrameManager.cuboidTable(dataflow, new LayoutEntity(),
+                    "3e560d22-b749-48c3-9f64-d4230207f120");
+            Assert.assertEquals(0, df.columns().length);
+        }
+
+        {
+            // condition: end == Long.MinValue
+            val partitionTblCol = dataflow.getModel().getPartitionDesc().getPartitionDateColumnRef();
+            val layoutEntity = Mockito.spy(new LayoutEntity());
+            ImmutableBiMap.Builder<Integer, TblColRef> dimsBuilder = ImmutableBiMap.builder();
+            ImmutableBiMap<Integer, TblColRef> orderedDimensions = dimsBuilder.put(1, partitionTblCol).build();
+            Mockito.when(layoutEntity.getOrderedDimensions()).thenReturn(orderedDimensions);
+            val fusionModel = FusionModelManager.getInstance(config, dataflow.getProject())
+                    .getFusionModel(dataflow.getModel().getFusionId());
+            val batchModelId = fusionModel.getBatchModel().getUuid();
+            val batchDataflow = NDataflowManager.getInstance(config, dataflow.getProject()).getDataflow(batchModelId);
+
+            dataflowManager.updateDataflow(batchDataflow.getId(), updater -> {
+                updater.getSegments().forEach(seg -> {
+                    try {
+                        val timeRange = seg.getTSRange();
+                        val field = TimeRange.class.getDeclaredField("end");
+                        field.setAccessible(true);
+                        ReflectionUtils.setField(field, timeRange, Long.MIN_VALUE);
+                        seg.setTimeRange(timeRange);
+                    } catch (Exception e) {
+                        Assert.fail(e.getMessage());
+                    }
+                });
+            });
+            val df = kylinDataFrameManager.cuboidTable(dataflow, layoutEntity, "3e560d22-b749-48c3-9f64-d4230207f120");
+            Assert.assertEquals(1, df.columns().length);
+        }
+        ss.stop();
+    }
+
+    @Test
+    void testCuboidTableOfBatchModel() {
+        val ss = SparkSession.builder().appName("local").master("local[1]").getOrCreate();
+        val config = KylinConfig.getInstanceFromEnv();
+        val dataflowManager = NDataflowManager.getInstance(config, "streaming_test");
+        val dataflow = dataflowManager.getDataflow("cd2b9a23-699c-4699-b0dd-38c9412b3dfd");
+        Assert.assertFalse(dataflow.isStreaming());
+        val kylinDataFrameManager = Mockito.spy(new KylinDataFrameManager(ss));
+        kylinDataFrameManager.option("isFastBitmapEnabled", "false");
+        val layoutEntity = new LayoutEntity();
+        {
+            val df = kylinDataFrameManager.cuboidTable(dataflow, layoutEntity, "86b5daaa-e295-4e8c-b877-f97bda69bee5");
+            Assert.assertEquals(0, df.columns().length);
+        }
+        ss.stop();
+    }
+}


[kylin] 01/15: fix ts query pushdown after force ts

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit d7269e764109a32664e48a56bfd9484192e1cf9c
Author: Shuai li <lo...@live.cn>
AuthorDate: Thu Oct 20 11:50:44 2022 +0800

    fix ts query pushdown after force ts
    
    Co-authored-by: shuai.li <sh...@kyligence.io>
---
 .../engine/exec/sparder/SparderQueryPlanExec.java  |  8 ++++
 .../kap/secondstorage/SecondStorageLockTest.java   | 50 ++++++++++++++++------
 2 files changed, 44 insertions(+), 14 deletions(-)

diff --git a/src/query/src/main/java/org/apache/kylin/query/engine/exec/sparder/SparderQueryPlanExec.java b/src/query/src/main/java/org/apache/kylin/query/engine/exec/sparder/SparderQueryPlanExec.java
index bbe7d25192..225c110baf 100644
--- a/src/query/src/main/java/org/apache/kylin/query/engine/exec/sparder/SparderQueryPlanExec.java
+++ b/src/query/src/main/java/org/apache/kylin/query/engine/exec/sparder/SparderQueryPlanExec.java
@@ -124,6 +124,7 @@ public class SparderQueryPlanExec implements QueryPlanExec {
                     }
                     QueryContext.current().setLastFailed(true);
                     cause = retryException;
+                    checkOnlyTsAnswer();
                 }
             }
             if (forceTableIndexAtException(e)) {
@@ -207,4 +208,11 @@ public class SparderQueryPlanExec implements QueryPlanExec {
             }
         }
     }
+
+    private void checkOnlyTsAnswer() {
+        if (QueryContext.current().getForcedToTieredStorage() == ForceToTieredStorage.CH_FAIL_TO_RETURN) {
+            throw new KylinException(QueryErrorCode.FORCED_TO_TIEREDSTORAGE_RETURN_ERROR,
+                    MsgPicker.getMsg().getForcedToTieredstorageReturnError());
+        }
+    }
 }
diff --git a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageLockTest.java b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageLockTest.java
index ffb4340233..006c03a3ee 100644
--- a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageLockTest.java
+++ b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageLockTest.java
@@ -2511,7 +2511,9 @@ public class SecondStorageLockTest implements JobWaiter {
             final String catalog = "default";
             Unsafe.setProperty(ClickHouseLoad.SOURCE_URL, getSourceUrl());
             Unsafe.setProperty(ClickHouseLoad.ROOT_PATH, getLocalWorkingDirectory());
-            configClickhouseWith(new JdbcDatabaseContainer[] { clickhouse1, clickhouse2 }, 1, catalog, () -> {
+            val container = new JdbcDatabaseContainer[] { clickhouse1, clickhouse2 };
+            int replica = 1;
+            configClickhouseWith(container, 1, catalog, () -> {
                 secondStorageService.changeProjectSecondStorageState(getProject(),
                         SecondStorageNodeHelper.getAllPairs(), true);
                 Assert.assertEquals(2, SecondStorageUtil.listProjectNodes(getProject()).size());
@@ -2525,6 +2527,8 @@ public class SecondStorageLockTest implements JobWaiter {
                 String sql = "select CAL_DT from TEST_KYLIN_FACT where CAL_DT between '2012-01-01' and '2012-01-02'";
                 Map<String, Map<String, Boolean>> nodeStatusMap;
 
+                testForceToTSAndChDown(sql, container, replica);
+                        
                 {
                     // testGroupNodeDownForceToTierStorageOK
                     clearQueryContext();
@@ -2606,28 +2610,46 @@ public class SecondStorageLockTest implements JobWaiter {
                     }
                     triggerClickHouseJob(getDataFlow());
                 }
-
-                {
-                    testReverseForceToTierStorageWhenCHUnavailable(sql);
-                }
-
-                {
-                    testReverseForceToTierStorageWhenCHOK(sql);
-                }
-
+                
+                testReverseForceToTierStorageWhenCHUnavailable(sql);
+                testReverseForceToTierStorageWhenCHOK(sql);
+                
                 //reset status
                 nodeStatusMap = ImmutableMap.of("pair0", ImmutableMap.of("node00", true), "pair1",
                         ImmutableMap.of("node01", true));
                 secondStorageEndpoint.updateNodeStatus(nodeStatusMap);
 
-                {
-                    testForceToTierStorageShutTierStorage(sql);
-                }
-
+                testForceToTierStorageShutTierStorage(sql);
                 return true;
             });
         }
     }
+    
+    @SneakyThrows
+    private void testForceToTSAndChDown(String sql, JdbcDatabaseContainer<?>[] container, int replica) {
+        ExecAndComp.queryModel(getProject(), sql);
+        OLAPContext.getNativeRealizations().stream().findFirst().ifPresent(r -> assertTrue(r.isSecondStorage()));
+
+        for (JdbcDatabaseContainer<?> clickhouse : container) {
+            clickhouse.stop();
+        }
+        clearQueryContext();
+        QueryContext queryContext = QueryContext.current();
+        queryContext.setForcedToTieredStorage(ForceToTieredStorage.CH_FAIL_TO_RETURN);
+        queryContext.setForceTableIndex(true);
+        assertThrows(SQLException.class, () -> ExecAndComp.queryModel(getProject(), sql));
+
+        clearQueryContext();
+        queryContext = QueryContext.current();
+        queryContext.setForcedToTieredStorage(ForceToTieredStorage.CH_FAIL_TO_DFS);
+        queryContext.setForceTableIndex(false);
+        ExecAndComp.queryModel(getProject(), sql);
+
+        for (JdbcDatabaseContainer<?> clickhouse : container) {
+            clickhouse.start();
+        }
+        ClickHouseUtils.internalConfigClickHouse(container, replica);
+    }
 
     private void testReverseForceToTierStorageWhenCHUnavailable(String sql) {
         // testReverseForceToTierStorageWhenCHUnavailable


[kylin] 07/15: KYLIN-5353 fix model saving failure when there are CCs of timestampdiff/timestampadd function

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 466263216e1007156438c719c54feab67c907aff
Author: Dorris Zhang <ru...@kyligence.io>
AuthorDate: Fri Oct 21 15:16:10 2022 +0800

    KYLIN-5353 fix model saving failure when there are CCs of timestampdiff/timestampadd function
    
    from 39687
    
    Co-authored-by: Dorris Zhang <ru...@kyligence.io>
---
 .../main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
index 6e029b6b4e..808b732f7a 100644
--- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
@@ -1114,7 +1114,7 @@ public class ModelSemanticHelper extends BasicService {
         return computedColumnDescs.stream().map(ccDesc -> {
             AtomicBoolean isValidCC = new AtomicBoolean(true);
             List<Pair<String, String>> colsWithAlias = ComputedColumnUtil.ExprIdentifierFinder
-                    .getExprIdentifiers(ccDesc.getInnerExpression());
+                    .getExprIdentifiers(ccDesc.getExpression());
             colsWithAlias.forEach(c -> {
                 String column = c.getFirst() + "." + c.getSecond();
                 if (!aliasDotColSet.contains(column)) {


[kylin] 04/15: KYLIN-5350 Update spark to 3.2.0-kylin-4.6.1.0-SNAPSHOT to fix high risk vulnerability

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 01bac4ea3bc43ac9da57848397bc854aa30842c1
Author: huangsheng <hu...@163.com>
AuthorDate: Mon Oct 24 10:46:25 2022 +0800

    KYLIN-5350 Update spark to 3.2.0-kylin-4.6.1.0-SNAPSHOT to fix high risk vulnerability
    
    Co-authored-by: huangsheng <huangshengtx@163.com
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 0a68c32b53..5c1ba4eecc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,7 +65,7 @@
 
         <!-- Spark versions -->
         <delta.version>1.2.1</delta.version>
-        <spark.version>3.2.0-kylin-4.5.20.0</spark.version>
+        <spark.version>3.2.0-kylin-4.6.1.0-SNAPSHOT</spark.version>
 
         <roaring.version>0.9.2-kylin-r4</roaring.version>
 


[kylin] 11/15: KYLIN-5364 Support case-insensitive when using table reloading api

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 03c6a0f4ea53de09b17f54109d844e3b8a8c6518
Author: Xinglong Li <xi...@kyligence.io>
AuthorDate: Fri Sep 30 17:30:28 2022 +0800

    KYLIN-5364 Support case-insensitive when using table reloading api
    
    Co-authored-by: Xinglong.Li <xi...@kyligence.io>
---
 .../java/org/apache/kylin/rest/service/TableService.java   |  4 ++--
 .../apache/kylin/rest/service/TableReloadServiceTest.java  | 14 ++++++++++++++
 2 files changed, 16 insertions(+), 2 deletions(-)

diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/TableService.java b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/TableService.java
index fe8e38292e..f4c94be684 100644
--- a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -1072,9 +1072,9 @@ public class TableService extends BasicService {
 
     public OpenPreReloadTableResponse preProcessBeforeReloadWithoutFailFast(String project, String tableIdentity,
             boolean needDetails) throws Exception {
+        Preconditions.checkNotNull(tableIdentity, "table identity can not be null");
         aclEvaluate.checkProjectWritePermission(project);
-
-        val context = calcReloadContext(project, tableIdentity, false);
+        val context = calcReloadContext(project, tableIdentity.toUpperCase(Locale.ROOT), false);
         removeFusionModelBatchPart(project, context);
         PreReloadTableResponse preReloadTableResponse = preProcessBeforeReloadWithContext(project, context, needDetails);
 
diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableReloadServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableReloadServiceTest.java
index 91e1b2a83e..b252929a93 100644
--- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableReloadServiceTest.java
+++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableReloadServiceTest.java
@@ -226,6 +226,20 @@ public class TableReloadServiceTest extends CSVSourceTestCase {
         Assert.assertEquals("BIGINT", model.getComputedColumnDescs().get(0).getDatatype());
     }
 
+    @Test
+    public void testPreProcess_UseCaseSensitiveTableIdentity() throws Exception {
+        NTableMetadataManager manager = NTableMetadataManager.getInstance(getTestConfig(), PROJECT);
+        TableDesc tableDesc = manager.getTableDesc("DEFAULT.TEST_KYLIN_FACT");
+        Assert.assertNotNull(tableDesc);
+        val response = tableService.preProcessBeforeReloadWithoutFailFast(PROJECT, "DEFAULT.TEST_KYLIN_FAct", false);
+        Assert.assertFalse(response.isHasDatasourceChanged());
+
+        // test table identity is null
+        thrown.expect(NullPointerException.class);
+        thrown.expectMessage("table identity can not be null");
+        tableService.preProcessBeforeReloadWithoutFailFast(PROJECT, null, false);
+    }
+
     private void dropModelWhen(Predicate<String> predicate) {
         modelService.listAllModelIdsInProject(PROJECT).stream().filter(predicate)
                 .forEach(id -> modelService.innerDropModel(id, PROJECT));


[kylin] 10/15: KYLIN-5365 given an incorrect query queue, check-1700-spark-kystorage.sh prints the detail error message

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit e23c37715bd712a3d27a68199bc64e4e280c1f46
Author: huangsheng <hu...@163.com>
AuthorDate: Wed Oct 26 17:13:34 2022 +0800

    KYLIN-5365 given an incorrect query queue, check-1700-spark-kystorage.sh prints the detail error message
    
    Co-authored-by: huangsheng <hu...@163.com>
---
 .../main/java/org/apache/kylin/tool/setup/KapGetClusterInfo.java | 9 ++++++++-
 .../java/org/apache/kylin/tool/setup/YarnResourceInfoTool.java   | 5 +++++
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/src/tool/src/main/java/org/apache/kylin/tool/setup/KapGetClusterInfo.java b/src/tool/src/main/java/org/apache/kylin/tool/setup/KapGetClusterInfo.java
index 0292dc77ee..d3cf151d59 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/setup/KapGetClusterInfo.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/setup/KapGetClusterInfo.java
@@ -111,7 +111,7 @@ public class KapGetClusterInfo {
         val patternedLogger = new BufferedLogger(logger);
         val response = config.getCliCommandExecutor().execute(command, patternedLogger).getCmd();
         logger.info("yarn metrics response: {}", response);
-        Map<String, Integer> clusterMetricsInfos;
+        Map<String, Integer> clusterMetricsInfos = null;
         if (response == null) {
             throw new IllegalStateException(
                     "Cannot get yarn metrics with url: " + yarnMasterUrlBase + YARN_METRICS_SUFFIX);
@@ -135,6 +135,13 @@ public class KapGetClusterInfo {
                 } else {
                     clusterMetricsInfos = yarnClusterMetrics.getYarnResourceInfoByQueueName(this.queueName);
                 }
+
+                if (clusterMetricsInfos == null || clusterMetricsInfos.isEmpty()) {
+                    logger.error("The queue:{} is invalid, please check kylin.properties", this.queueName);
+                    Unsafe.systemExit(101);
+                    return;
+                }
+
                 clusterMetricsMap.put(AVAILABLE_VIRTUAL_CORE, clusterMetricsInfos.get(AVAILABLE_VIRTUAL_CORE));
                 clusterMetricsMap.put(AVAILABLE_MEMORY, clusterMetricsInfos.get(AVAILABLE_MEMORY));
                 return;
diff --git a/src/tool/src/main/java/org/apache/kylin/tool/setup/YarnResourceInfoTool.java b/src/tool/src/main/java/org/apache/kylin/tool/setup/YarnResourceInfoTool.java
index 3fa65d45eb..33984f9093 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/setup/YarnResourceInfoTool.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/setup/YarnResourceInfoTool.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.file.Paths;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -67,6 +68,10 @@ public class YarnResourceInfoTool {
         Map<String, Integer> clusterMetricsMap = new HashMap<>();
 
         QueueInfo queueInfo = yarnClient.getQueueInfo(queue);
+        if (queueInfo == null) {
+            return Collections.emptyMap();
+        }
+
         availableMB += queueInfo.getQueueStatistics().getAvailableMemoryMB();
         availableVirtualCores += queueInfo.getQueueStatistics().getAvailableVCores();
 


[kylin] 02/15: KYLIN-5314 fix bean can not be autowired

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 4b859f88c91f68fa0aea85948e16b0a40a35df3d
Author: Pengfei Zhan <pe...@kyligence.io>
AuthorDate: Wed Oct 12 20:50:19 2022 +0800

    KYLIN-5314 fix bean can not be autowired
---
 .../src/main/java/org/apache/kylin/common/KylinConfigBase.java    | 1 -
 .../java/org/apache/kylin/rest/service/AbstractModelService.java  | 8 ++++----
 2 files changed, 4 insertions(+), 5 deletions(-)

diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 4d27043511..e8d777d013 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2931,7 +2931,6 @@ public abstract class KylinConfigBase implements Serializable {
         return getOptional("spring.session.store-type", "");
     }
 
-
     public int getJdbcSessionMaxInactiveInterval() {
         return Integer.parseInt(getOptional("spring.session.timeout", "3600"));
     }
diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/AbstractModelService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/AbstractModelService.java
index 849214bd34..b46314b2e1 100644
--- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/AbstractModelService.java
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/AbstractModelService.java
@@ -57,7 +57,7 @@ public class AbstractModelService extends BasicService {
     @Autowired
     public AccessService accessService;
 
-    public final void checkModelPermission(String project, String modelId) {
+    public void checkModelPermission(String project, String modelId) {
         String userName = aclEvaluate.getCurrentUserName();
         Set<String> groups = getCurrentUserGroups();
         if (AclPermissionUtil.isAdmin() || AclPermissionUtil.isAdminInProject(project, groups)) {
@@ -94,7 +94,7 @@ public class AbstractModelService extends BasicService {
         });
     }
 
-    public final NDataModel getModelById(String modelId, String project) {
+    public NDataModel getModelById(String modelId, String project) {
         NDataModelManager modelManager = getManager(NDataModelManager.class, project);
         NDataModel nDataModel = modelManager.getDataModelDesc(modelId);
         if (null == nDataModel) {
@@ -103,7 +103,7 @@ public class AbstractModelService extends BasicService {
         return nDataModel;
     }
 
-    public final NDataModel getModelByAlias(String modelAlias, String project) {
+    public NDataModel getModelByAlias(String modelAlias, String project) {
         NDataModelManager modelManager = getManager(NDataModelManager.class, project);
         NDataModel nDataModel = modelManager.getDataModelDescByAlias(modelAlias);
         if (null == nDataModel) {
@@ -112,7 +112,7 @@ public class AbstractModelService extends BasicService {
         return nDataModel;
     }
 
-    public final Set<String> listAllModelIdsInProject(String project) {
+    public Set<String> listAllModelIdsInProject(String project) {
         NDataModelManager dataModelManager = getManager(NDataModelManager.class, project);
         return dataModelManager.listAllModelIds();
     }


[kylin] 06/15: KYLIN-5352 updating healthy model no need set id of the simplified measure

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 4ab0273f45054ae7712689d456615f7658419cdd
Author: Pengfei Zhan <pe...@kyligence.io>
AuthorDate: Fri Oct 21 23:29:10 2022 +0800

    KYLIN-5352 updating healthy model no need set id of the simplified measure
---
 .../kylin/rest/service/ModelSemanticHelper.java    |  85 +++++++------
 .../service/ModelServiceSemanticUpdateTest.java    | 133 +++++++++++++++++----
 .../kylin/rest/service/TableReloadServiceTest.java |  82 ++++++-------
 3 files changed, 201 insertions(+), 99 deletions(-)

diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
index 9471316401..6e029b6b4e 100644
--- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
@@ -524,47 +524,12 @@ public class ModelSemanticHelper extends BasicService {
                     unusedColumn.setStatus(NDataModel.ColumnStatus.TOMB);
                     updateImpact.getRemovedOrUpdatedCCs().add(unusedColumn.getId());
                 });
-
-        Set<Integer> healthyExistedMeasures = Sets.newHashSet();
-        List<String> illegalSimplifiedMeasures = Lists.newArrayList();
-
-        Map<String, Integer> nameToIdOfSimplified = Maps.newHashMap();
-        Set<Integer> idOfSimplified = Sets.newHashSet();
-        for (SimplifiedMeasure measure : request.getSimplifiedMeasures()) {
-            nameToIdOfSimplified.put(measure.getName(), measure.getId());
-            if (measure.getId() != 0) {
-                idOfSimplified.add(measure.getId());
-            }
-        }
-
-        List<Measure> nonCountStarExistedMeasures = originModel.getAllMeasures().stream()
-                .filter(measure -> !measure.getName().equals("COUNT_ALL")).filter(measure -> !measure.isTomb())
-                .collect(Collectors.toList());
-        Map<String, Integer> nameToIdOfExistedModel = nonCountStarExistedMeasures.stream()
-                .collect(Collectors.toMap(MeasureDesc::getName, Measure::getId));
-        nameToIdOfExistedModel.forEach((name, id) -> {
-            if (!nameToIdOfSimplified.containsKey(name)) {
-                if (idOfSimplified.contains(id)) {
-                    healthyExistedMeasures.add(id);
-                }
-            } else if (nameToIdOfSimplified.get(name) == 0) {
-                illegalSimplifiedMeasures.add(name);
-            } else {
-                healthyExistedMeasures.add(id);
-            }
-        });
-
-        if (!illegalSimplifiedMeasures.isEmpty()) {
-            throw new KylinException(SIMPLIFIED_MEASURES_MISSING_ID, String.join(",", illegalSimplifiedMeasures));
+        Set<String> allFunctions = originModel.getEffectiveMeasures().values().stream()
+                .map(measure -> measure.getFunction().toString()).collect(Collectors.toSet());
+        if (allFunctions.size() != originModel.getEffectiveMeasures().size()) {
+            fixDupMeasureNames(originModel, request);
         }
 
-        nonCountStarExistedMeasures.stream() //
-                .filter(measure -> !healthyExistedMeasures.contains(measure.getId())) //
-                .forEach(measure -> {
-                    log.warn("the measure({}) has been handled to tomb", measure.getName());
-                    measure.setTomb(true);
-                });
-
         // move deleted CC's measure to TOMB
         List<Measure> currentMeasures = originModel.getEffectiveMeasures().values().asList();
         currentMeasures.stream().filter(measure -> {
@@ -620,6 +585,48 @@ public class ModelSemanticHelper extends BasicService {
         return updateImpact;
     }
 
+    private void fixDupMeasureNames(NDataModel originModel, ModelRequest request) {
+        Set<Integer> healthyExistedMeasures = Sets.newHashSet();
+        List<String> illegalSimplifiedMeasures = Lists.newArrayList();
+
+        Map<String, Integer> nameToIdOfSimplified = Maps.newHashMap();
+        Set<Integer> idOfSimplified = Sets.newHashSet();
+        for (SimplifiedMeasure measure : request.getSimplifiedMeasures()) {
+            nameToIdOfSimplified.put(measure.getName(), measure.getId());
+            if (measure.getId() != 0) {
+                idOfSimplified.add(measure.getId());
+            }
+        }
+
+        List<Measure> nonCountStarExistedMeasures = originModel.getAllMeasures().stream()
+                .filter(measure -> !measure.getName().equals("COUNT_ALL")).filter(measure -> !measure.isTomb())
+                .collect(Collectors.toList());
+        Map<String, Integer> nameToIdOfExistedModel = nonCountStarExistedMeasures.stream()
+                .collect(Collectors.toMap(MeasureDesc::getName, Measure::getId));
+        nameToIdOfExistedModel.forEach((name, id) -> {
+            if (!nameToIdOfSimplified.containsKey(name)) {
+                if (idOfSimplified.contains(id)) {
+                    healthyExistedMeasures.add(id);
+                }
+            } else if (nameToIdOfSimplified.get(name) == 0) {
+                illegalSimplifiedMeasures.add(name);
+            } else {
+                healthyExistedMeasures.add(id);
+            }
+        });
+
+        if (!illegalSimplifiedMeasures.isEmpty()) {
+            throw new KylinException(SIMPLIFIED_MEASURES_MISSING_ID, String.join(",", illegalSimplifiedMeasures));
+        }
+
+        nonCountStarExistedMeasures.stream() //
+                .filter(measure -> !healthyExistedMeasures.contains(measure.getId())) //
+                .forEach(measure -> {
+                    log.warn("the measure({}) has been handled to tomb", measure.getName());
+                    measure.setTomb(true);
+                });
+    }
+
     /**
      *  one measure in expectedModel but not in originModel then add one
      *  one in expectedModel, is also a TOMB one in originModel, set status to not TOMB
diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java
index c39e2153ac..f0ddc80d02 100644
--- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java
+++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java
@@ -20,13 +20,17 @@ package org.apache.kylin.rest.service;
 import static org.apache.kylin.common.exception.code.ErrorCodeServer.SIMPLIFIED_MEASURES_MISSING_ID;
 import static org.hamcrest.Matchers.is;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.kylin.engine.spark.job.ExecutableAddCuboidHandler;
-import org.apache.kylin.engine.spark.job.NSparkCubingJob;
-import lombok.extern.slf4j.Slf4j;
-import lombok.val;
-import lombok.var;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -36,6 +40,8 @@ import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.cube.model.SelectRule;
 import org.apache.kylin.engine.spark.ExecutableUtils;
+import org.apache.kylin.engine.spark.job.ExecutableAddCuboidHandler;
+import org.apache.kylin.engine.spark.job.NSparkCubingJob;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.NExecutableManager;
@@ -50,6 +56,7 @@ import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
 import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
 import org.apache.kylin.metadata.cube.model.RuleBasedIndex;
 import org.apache.kylin.metadata.model.ComputedColumnDesc;
+import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.ManagementType;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.NDataModel;
@@ -57,7 +64,9 @@ import org.apache.kylin.metadata.model.NDataModel.ColumnStatus;
 import org.apache.kylin.metadata.model.NDataModel.Measure;
 import org.apache.kylin.metadata.model.NDataModel.NamedColumn;
 import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.metadata.model.ParameterDesc;
 import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
 import org.apache.kylin.rest.constant.Constant;
@@ -84,16 +93,14 @@ import org.springframework.security.authentication.TestingAuthenticationToken;
 import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.test.util.ReflectionTestUtils;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import lombok.val;
+import lombok.var;
+import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class ModelServiceSemanticUpdateTest extends NLocalFileMetadataTestCase {
@@ -411,25 +418,111 @@ public class ModelServiceSemanticUpdateTest extends NLocalFileMetadataTestCase {
 
     @Test
     public void testRenameTableAliasUsedWithSimplifiedMeasure() throws IOException {
-        val modelManager = NDataModelManager.getInstance(getTestConfig(), getProject());
+        String project = getProject();
+        val modelManager = NDataModelManager.getInstance(getTestConfig(), project);
         modelManager.listAllModels().forEach(modelManager::dropModel);
         val request = JsonUtil.readValue(
                 getClass().getResourceAsStream("/ut_request/model_update/model_with_measure.json"), ModelRequest.class);
         request.setAlias("model_with_measure");
-        val newModel = modelService.createModel(request.getProject(), request);
+        val newModel = modelService.createModel(project, request);
+        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+            // prepare dirty model
+            NDataModelManager modelMgr = NDataModelManager.getInstance(getTestConfig(), project);
+            modelMgr.updateDataModel(newModel.getId(), copyForWrite -> {
+                List<Measure> allMeasures = copyForWrite.getAllMeasures();
+                Measure measure = new Measure();
+                measure.setId(100002);
+                measure.setType(NDataModel.MeasureType.NORMAL);
+                measure.setName("MAX2");
+                FunctionDesc function = new FunctionDesc();
+                function.setExpression("MAX");
+                function.setReturnType("integer");
+                function.setConfiguration(Maps.newLinkedHashMap());
+                ParameterDesc parameter = new ParameterDesc();
+                parameter.setType("column");
+                parameter.setValue("TEST_ACCOUNT.ACCOUNT_SELLER_LEVEL");
+                parameter.setColRef(allMeasures.get(0).getFunction().getParameters().get(0).getColRef());
+                function.setParameters(ImmutableList.of(parameter));
+                measure.setFunction(function);
+                allMeasures.add(measure);
+            });
+            return null;
+        }, project);
         val updateRequest = JsonUtil.readValue(
                 getClass().getResourceAsStream("/ut_request/model_update/model_with_measure_change_alias.json"),
                 ModelRequest.class);
         updateRequest.setAlias("model_with_measure_change_alias");
         updateRequest.setUuid(newModel.getUuid());
+        List<SimplifiedMeasure> simplifiedMeasures = updateRequest.getSimplifiedMeasures();
+        simplifiedMeasures.get(0).setId(100000);
+        simplifiedMeasures.get(1).setId(100001);
+        SimplifiedMeasure simplifiedMeasure = new SimplifiedMeasure();
+        ParameterResponse param = new ParameterResponse();
+        param.setType("column");
+        param.setValue("TEST_ACCOUNT.ACCOUNT_SELLER_LEVEL");
+        simplifiedMeasure.setParameterValue(ImmutableList.of(param));
+        simplifiedMeasure.setExpression("MAX");
+        simplifiedMeasure.setName("MAX2");
+        simplifiedMeasure.setReturnType("integer");
+        simplifiedMeasures.add(simplifiedMeasure);
         try {
-            modelService.updateDataModelSemantic(getProject(), updateRequest);
+            modelService.updateDataModelSemantic(project, updateRequest);
             Assert.fail();
         } catch (KylinException e) {
             Assert.assertEquals(SIMPLIFIED_MEASURES_MISSING_ID.getErrorCode().getCode(), e.getErrorCodeString());
         }
     }
 
+    @Test
+    public void testMockFixDirtyModelWhenSaving() throws IOException {
+        val modelManager = NDataModelManager.getInstance(getTestConfig(), getProject());
+        modelManager.listAllModels().forEach(modelManager::dropModel);
+        val request = JsonUtil.readValue(
+                getClass().getResourceAsStream("/ut_request/model_update/model_with_measure.json"), ModelRequest.class);
+        request.setAlias("model_with_measure");
+        val newModel = modelService.createModel(request.getProject(), request);
+        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+            // prepare dirty model
+            NDataModelManager modelMgr = NDataModelManager.getInstance(getTestConfig(), getProject());
+            modelMgr.updateDataModel(newModel.getId(), copyForWrite -> {
+                List<Measure> allMeasures = copyForWrite.getAllMeasures();
+                Measure measure = new Measure();
+                measure.setId(100002);
+                measure.setType(NDataModel.MeasureType.NORMAL);
+                measure.setName("MAX2");
+                FunctionDesc function = new FunctionDesc();
+                function.setExpression("MAX");
+                function.setReturnType("integer");
+                function.setConfiguration(Maps.newLinkedHashMap());
+                ParameterDesc parameter = new ParameterDesc();
+                parameter.setType("column");
+                parameter.setValue("TEST_ACCOUNT.ACCOUNT_SELLER_LEVEL");
+                parameter.setColRef(allMeasures.get(0).getFunction().getParameters().get(0).getColRef());
+                function.setParameters(ImmutableList.of(parameter));
+                measure.setFunction(function);
+                allMeasures.add(measure);
+            });
+            return null;
+        }, getProject());
+
+        // set max2 to tomb
+        val updateRequest = JsonUtil.readValue(
+                getClass().getResourceAsStream("/ut_request/model_update/model_with_measure_change_alias.json"),
+                ModelRequest.class);
+        updateRequest.setAlias("model_with_measure_change_alias");
+        updateRequest.setUuid(newModel.getUuid());
+        List<SimplifiedMeasure> simplifiedMeasures = updateRequest.getSimplifiedMeasures();
+        simplifiedMeasures.get(0).setId(100000);
+        simplifiedMeasures.get(1).setId(100001);
+        modelService.updateDataModelSemantic(getProject(), updateRequest);
+
+        NDataModel modifiedModel = modelManager.getDataModelDesc(newModel.getUuid());
+        List<Measure> allMeasures = modifiedModel.getAllMeasures();
+        Optional<Measure> max2 = allMeasures.stream().filter(measure -> measure.getName().equals("MAX2")).findFirst();
+        Assert.assertTrue(max2.isPresent());
+        Assert.assertTrue(max2.get().isTomb());
+    }
+
     @Test
     public void testRenameTableAliasUsedAsMeasure() throws Exception {
         val modelManager = NDataModelManager.getInstance(getTestConfig(), getProject());
diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableReloadServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableReloadServiceTest.java
index 7008996d7d..91e1b2a83e 100644
--- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableReloadServiceTest.java
+++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableReloadServiceTest.java
@@ -17,16 +17,26 @@
  */
 package org.apache.kylin.rest.service;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import io.kyligence.kap.clickhouse.MockSecondStorage;
-import org.apache.kylin.engine.spark.job.NSparkCubingJob;
-import org.apache.kylin.engine.spark.job.NTableSamplingJob;
-import io.kyligence.kap.secondstorage.SecondStorageUtil;
-import lombok.extern.slf4j.Slf4j;
-import lombok.val;
-import lombok.var;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.TABLE_RELOAD_HAVING_NOT_FINAL_JOB;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.is;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.exception.KylinException;
@@ -36,6 +46,8 @@ import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.common.scheduler.EventBusFactory;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.cube.model.SelectRule;
+import org.apache.kylin.engine.spark.job.NSparkCubingJob;
+import org.apache.kylin.engine.spark.job.NTableSamplingJob;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.JobTypeEnum;
 import org.apache.kylin.job.execution.NExecutableManager;
@@ -82,25 +94,15 @@ import org.springframework.security.core.Authentication;
 import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.test.util.ReflectionTestUtils;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
-import static org.apache.kylin.common.exception.code.ErrorCodeServer.TABLE_RELOAD_HAVING_NOT_FINAL_JOB;
-import static org.awaitility.Awaitility.await;
-import static org.hamcrest.CoreMatchers.is;
+import io.kyligence.kap.clickhouse.MockSecondStorage;
+import io.kyligence.kap.secondstorage.SecondStorageUtil;
+import lombok.val;
+import lombok.var;
+import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class TableReloadServiceTest extends CSVSourceTestCase {
@@ -628,7 +630,7 @@ public class TableReloadServiceTest extends CSVSourceTestCase {
         Assert.assertNotNull(reModel);
         Assert.assertFalse(reModel.isBroken());
         Assert.assertEquals(9, reModel.getJoinTables().size());
-        Assert.assertEquals(18, reModel.getAllMeasures().size());
+        Assert.assertEquals(17, reModel.getAllMeasures().size());
         Assert.assertEquals(198, reModel.getAllNamedColumns().size());
         Assert.assertEquals("ORDER_ID", reModel.getAllNamedColumns().get(13).getName());
         Assert.assertEquals(NDataModel.ColumnStatus.TOMB, reModel.getAllNamedColumns().get(13).getStatus());
@@ -690,7 +692,7 @@ public class TableReloadServiceTest extends CSVSourceTestCase {
         Assert.assertNotNull(reModel);
         Assert.assertFalse(reModel.isBroken());
         Assert.assertEquals(9, reModel.getJoinTables().size());
-        Assert.assertEquals(18, reModel.getAllMeasures().size());
+        Assert.assertEquals(17, reModel.getAllMeasures().size());
         Assert.assertEquals(198, reModel.getAllNamedColumns().size());
         Assert.assertEquals("CAL_DT", reModel.getAllNamedColumns().get(2).getName());
         Assert.assertEquals("DEAL_YEAR", reModel.getAllNamedColumns().get(28).getName());
@@ -740,7 +742,7 @@ public class TableReloadServiceTest extends CSVSourceTestCase {
         Assert.assertNotNull(reModel);
         Assert.assertFalse(reModel.isBroken());
         Assert.assertEquals(9, reModel.getJoinTables().size());
-        Assert.assertEquals(18, reModel.getAllMeasures().size());
+        Assert.assertEquals(17, reModel.getAllMeasures().size());
         Assert.assertEquals(198, reModel.getAllNamedColumns().size());
         Assert.assertEquals("CAL_DT", reModel.getAllNamedColumns().get(2).getName());
         Assert.assertEquals("DEAL_YEAR", reModel.getAllNamedColumns().get(28).getName());
@@ -1054,8 +1056,8 @@ public class TableReloadServiceTest extends CSVSourceTestCase {
         executableManager.addJob(job);
         removeColumn(tableIdentity, "TEST_TIME_ENC");
 
-        OpenPreReloadTableResponse response = tableService.preProcessBeforeReloadWithoutFailFast(PROJECT,
-                tableIdentity, false);
+        OpenPreReloadTableResponse response = tableService.preProcessBeforeReloadWithoutFailFast(PROJECT, tableIdentity,
+                false);
         Assert.assertTrue(response.isHasEffectedJobs());
         Assert.assertEquals(1, response.getEffectedJobs().size());
         Assert.assertThrows(TABLE_RELOAD_HAVING_NOT_FINAL_JOB.getMsg(job.getId()), KylinException.class,
@@ -1071,8 +1073,8 @@ public class TableReloadServiceTest extends CSVSourceTestCase {
         job.setJobType(JobTypeEnum.TABLE_SAMPLING);
         executableManager.addJob(job);
         addColumn(tableIdentity, true, new ColumnDesc("", "TEST_COL", "int", "", "", "", null));
-        OpenPreReloadTableResponse response = tableService.preProcessBeforeReloadWithoutFailFast(PROJECT,
-                tableIdentity, false);
+        OpenPreReloadTableResponse response = tableService.preProcessBeforeReloadWithoutFailFast(PROJECT, tableIdentity,
+                false);
         Assert.assertTrue(response.isHasEffectedJobs());
         Assert.assertEquals(1, response.getEffectedJobs().size());
         tableService.preProcessBeforeReloadWithFailFast(PROJECT, tableIdentity);
@@ -1091,8 +1093,8 @@ public class TableReloadServiceTest extends CSVSourceTestCase {
                 put("SLR_SEGMENT_CD", "bigint");
             }
         }, true);
-        OpenPreReloadTableResponse response = tableService.preProcessBeforeReloadWithoutFailFast(PROJECT,
-                tableIdentity, false);
+        OpenPreReloadTableResponse response = tableService.preProcessBeforeReloadWithoutFailFast(PROJECT, tableIdentity,
+                false);
         Assert.assertTrue(response.isHasEffectedJobs());
         Assert.assertEquals(1, response.getEffectedJobs().size());
         Assert.assertThrows(TABLE_RELOAD_HAVING_NOT_FINAL_JOB.getMsg(job.getId()), KylinException.class,
@@ -1626,7 +1628,7 @@ public class TableReloadServiceTest extends CSVSourceTestCase {
     }
 
     @Test
-    public void testReloadAWSTableCompatibleCrossAccountNoSample(){
+    public void testReloadAWSTableCompatibleCrossAccountNoSample() {
         S3TableExtInfo tableExtInfo = prepareTableExtInfo("DEFAULT.TEST_ORDER", "endpoint", "role");
         prepareTableExt("DEFAULT.TEST_ORDER");
         tableService.reloadAWSTableCompatibleCrossAccount(PROJECT, tableExtInfo, false, 10000, true, 3, null);
@@ -1648,13 +1650,13 @@ public class TableReloadServiceTest extends CSVSourceTestCase {
     }
 
     @Test(expected = Exception.class)
-    public void testReloadAWSTableCompatibleCrossAccountNeedSample(){
+    public void testReloadAWSTableCompatibleCrossAccountNeedSample() {
         S3TableExtInfo tableExtInfo = prepareTableExtInfo("DEFAULT.TEST_ORDER", "endpoint", "role");
         prepareTableExt("DEFAULT.TEST_ORDER");
         tableService.reloadAWSTableCompatibleCrossAccount(PROJECT, tableExtInfo, true, 10000, true, 3, null);
     }
 
-    private S3TableExtInfo prepareTableExtInfo(String dbTable, String endpoint, String role){
+    private S3TableExtInfo prepareTableExtInfo(String dbTable, String endpoint, String role) {
         S3TableExtInfo tableExtInfo = new S3TableExtInfo();
         tableExtInfo.setName(dbTable);
         tableExtInfo.setEndpoint(endpoint);


[kylin] 14/15: KYLIN-5363 Fix the problem of job metadata change when the number of segments in parallel build is too large

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 6cb5d889e7d3961460ba8374f0fac82a97153012
Author: sibingzhang <74...@users.noreply.github.com>
AuthorDate: Fri Oct 28 15:33:09 2022 +0800

    KYLIN-5363 Fix the problem of job metadata change when the number of segments in parallel build is too large
    
    from 7124
    
    Co-authored-by: sibing.zhang <19...@qq.com>
---
 .../org/apache/kylin/job/dao/NExecutableDao.java   | 25 ++++++++++++
 .../kylin/job/execution/NExecutableManager.java    | 17 ++++++--
 .../kylin/job/execution/DagExecutableTest.java     |  4 ++
 .../org/apache/kylin/rest/service/JobService.java  |  1 +
 .../kylin/rest/service/DagJobServiceTest.java      |  4 ++
 .../apache/kylin/rest/service/JobErrorTest.java    |  4 +-
 .../apache/kylin/rest/service/JobServiceTest.java  | 27 ++++++++++++-
 .../org/apache/kylin/rest/service/StageTest.java   | 47 ++++++++++++++++++++++
 .../kylin/engine/spark/job/NSparkExecutable.java   |  5 ++-
 9 files changed, 127 insertions(+), 7 deletions(-)

diff --git a/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java b/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java
index 5c353d989c..f95779e96f 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java
@@ -19,7 +19,9 @@
 package org.apache.kylin.job.dao;
 
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -61,6 +63,8 @@ public class NExecutableDao {
 
     private CachedCrudAssist<ExecutablePO> crud;
 
+    private Map<String, ExecutablePO> updating = new HashMap<>();
+
     private NExecutableDao(KylinConfig config, String project) {
         logger.trace("Using metadata url: {}", config);
         this.project = project;
@@ -126,6 +130,27 @@ public class NExecutableDao {
         }
     }
 
+    public void updateJobWithoutSave(String uuid, Predicate<ExecutablePO> updater) {
+        ExecutablePO executablePO;
+        if (updating.containsKey(uuid)) {
+            executablePO = updating.get(uuid);
+        } else {
+            ExecutablePO executablePOFromCache = getJobByUuid(uuid);
+            Preconditions.checkNotNull(executablePOFromCache);
+            val copyForWrite = JsonUtil.copyBySerialization(executablePOFromCache, JOB_SERIALIZER, null);
+            updating.put(uuid, copyForWrite);
+            executablePO = copyForWrite;
+        }
+        updater.test(executablePO);
+    }
+
+    public void saveUpdatedJob() {
+        for (ExecutablePO executablePO : updating.values()) {
+            crud.save(executablePO);
+        }
+        updating = new HashMap<>();
+    }
+
     private ResourceStore getStore() {
         return ResourceStore.getKylinMetaStore(config);
     }
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
index 00dfcd6a1c..a161751468 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
@@ -281,6 +281,10 @@ public class NExecutableManager {
         }
     }
 
+    public void saveUpdatedJob() {
+        executableDao.saveUpdatedJob();
+    }
+
     public Set<String> getYarnApplicationJobs(String id) {
         ExecutablePO executablePO = executableDao.getJobByUuid(id);
         String appIds = executablePO.getOutput().getInfo().getOrDefault(YARN_APP_IDS, "");
@@ -785,6 +789,7 @@ public class NExecutableManager {
                                     .forEach(stage -> //
                             updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.READY, null, null));
                         }
+                        saveUpdatedJob();
                     }
                 }
             });
@@ -866,6 +871,7 @@ public class NExecutableManager {
                                     .forEach(stage -> // when restart, reset stage
                             updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.READY, null, null, true));
                         }
+                        saveUpdatedJob();
                     }
                 }
 
@@ -908,6 +914,7 @@ public class NExecutableManager {
                                     .forEach(stage -> //
                             updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.SUICIDAL, null, null));
                         }
+                        saveUpdatedJob();
                     }
                 }
             });
@@ -933,6 +940,7 @@ public class NExecutableManager {
                                     .forEach(stage -> //
                             updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.DISCARDED, null, null));
                         }
+                        saveUpdatedJob();
                     }
                 }
             });
@@ -963,6 +971,7 @@ public class NExecutableManager {
                                     .forEach(stage -> //
                             updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.ERROR, null, null));
                         }
+                        saveUpdatedJob();
                     }
                 }
             });
@@ -1002,6 +1011,7 @@ public class NExecutableManager {
                                     .forEach(stage -> //
                             updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.PAUSED, null, null));
                         }
+                        saveUpdatedJob();
                     }
                 }
             });
@@ -1158,7 +1168,7 @@ public class NExecutableManager {
     public void updateStageStatus(String taskOrJobId, String segmentId, ExecutableState newStatus,
             Map<String, String> updateInfo, String failedMsg, Boolean isRestart) {
         val jobId = extractJobId(taskOrJobId);
-        executableDao.updateJob(jobId, job -> {
+        executableDao.updateJobWithoutSave(jobId, job -> {
             final List<Map<String, List<ExecutablePO>>> collect = job.getTasks().stream()//
                     .map(ExecutablePO::getStagesMap)//
                     .filter(MapUtils::isNotEmpty)//
@@ -1253,6 +1263,7 @@ public class NExecutableManager {
                                     .forEach(stage -> //
                             updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.SUCCEED, null, null));
                         }
+                        saveUpdatedJob();
                     }
                 }
             });
@@ -1278,6 +1289,7 @@ public class NExecutableManager {
                                     .forEach(stage -> //
                             updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.ERROR, null, null));
                         }
+                        saveUpdatedJob();
                     }
                 }
             });
@@ -1371,8 +1383,7 @@ public class NExecutableManager {
         }
         val thread = scheduler.getContext().getRunningJobThread(executable);
         if (thread != null) {
-            logger.info("Interrupt Job [{}] thread and remove in ExecutableContext",
-                    executable.getDisplayName());
+            logger.info("Interrupt Job [{}] thread and remove in ExecutableContext", executable.getDisplayName());
             thread.interrupt();
             scheduler.getContext().removeRunningJob(executable);
         }
diff --git a/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java b/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java
index f39969755d..9357a55159 100644
--- a/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java
+++ b/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java
@@ -644,6 +644,7 @@ class DagExecutableTest {
         manager.updateStageStatus(stage1.getId(), task.getId(), ExecutableState.RUNNING, null, null);
         manager.updateStageStatus(stage2.getId(), task.getId(), ExecutableState.RUNNING, null, null);
         manager.updateStageStatus(stage3.getId(), task.getId(), ExecutableState.RUNNING, null, null);
+        manager.saveUpdatedJob();
         await().pollDelay(Duration.ONE_SECOND).until(() -> true);
         manager.updateJobOutput(job.getId(), ExecutableState.SUCCEED);
         manager.updateJobOutput(task.getId(), ExecutableState.SUCCEED);
@@ -652,6 +653,7 @@ class DagExecutableTest {
         manager.updateStageStatus(stage2.getId(), task.getId(), ExecutableState.SUCCEED, null, null);
         await().pollDelay(Duration.ONE_SECOND).until(() -> true);
         manager.updateStageStatus(stage3.getId(), task.getId(), ExecutableState.SUCCEED, null, null);
+        manager.saveUpdatedJob();
 
         val taskDuration = task.getTaskDurationToTest(task);
         val expected = AbstractExecutable.getDuration(stage1.getOutput(task.getId()))
@@ -687,6 +689,7 @@ class DagExecutableTest {
         manager.updateStageStatus(stage1.getId(), task.getId(), ExecutableState.RUNNING, null, null);
         manager.updateStageStatus(stage2.getId(), task.getId(), ExecutableState.RUNNING, null, null);
         manager.updateStageStatus(stage3.getId(), task.getId(), ExecutableState.RUNNING, null, null);
+        manager.saveUpdatedJob();
         await().pollDelay(Duration.ONE_SECOND).until(() -> true);
         manager.updateJobOutput(job.getId(), ExecutableState.SUCCEED);
         manager.updateJobOutput(task.getId(), ExecutableState.SUCCEED);
@@ -695,6 +698,7 @@ class DagExecutableTest {
         manager.updateStageStatus(stage2.getId(), task.getId(), ExecutableState.SUCCEED, null, null);
         await().pollDelay(Duration.ONE_SECOND).until(() -> true);
         manager.updateStageStatus(stage3.getId(), task.getId(), ExecutableState.SUCCEED, null, null);
+        manager.saveUpdatedJob();
 
         val taskDuration = task.getTaskDurationToTest(task);
         val expected = task.getDuration();
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
index 22ea2b774a..08ef663424 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -987,6 +987,7 @@ public class JobService extends BasicService implements JobSupporter {
         EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
             val executableManager = getManager(NExecutableManager.class, project);
             executableManager.updateStageStatus(taskId, segmentId, newStatus, updateInfo, errMsg);
+            executableManager.saveUpdatedJob();
             return null;
         }, project, UnitOfWork.DEFAULT_MAX_RETRY, UnitOfWork.DEFAULT_EPOCH_ID, jobId);
     }
diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java
index 2acffd1281..bbbed42b0b 100644
--- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java
+++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java
@@ -165,6 +165,7 @@ class DagJobServiceTest {
         EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
             manager.updateJobOutput(task1.getId(), ExecutableState.ERROR);
             manager.updateStageStatus(stage11.getId(), task1.getId(), ExecutableState.ERROR, null, null);
+            manager.saveUpdatedJob();
             return null;
         }, DEFAULT_PROJECT, 1, UnitOfWork.DEFAULT_EPOCH_ID);
 
@@ -175,6 +176,7 @@ class DagJobServiceTest {
         EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
             manager.updateJobOutput(task3.getId(), ExecutableState.ERROR);
             manager.updateStageStatus(stage31.getId(), task3.getId(), ExecutableState.ERROR, null, null);
+            manager.saveUpdatedJob();
             return null;
         }, DEFAULT_PROJECT, 1, UnitOfWork.DEFAULT_EPOCH_ID);
 
@@ -239,6 +241,7 @@ class DagJobServiceTest {
         EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
             manager.updateJobOutput(task1.getId(), ExecutableState.ERROR);
             manager.updateStageStatus(stage11.getId(), task1.getId(), ExecutableState.ERROR, null, null);
+            manager.saveUpdatedJob();
             return null;
         }, DEFAULT_PROJECT, 1, UnitOfWork.DEFAULT_EPOCH_ID);
 
@@ -249,6 +252,7 @@ class DagJobServiceTest {
         EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
             manager.updateJobOutput(task3.getId(), ExecutableState.ERROR);
             manager.updateStageStatus(stage31.getId(), task3.getId(), ExecutableState.ERROR, null, null);
+            manager.saveUpdatedJob();
             return null;
         }, DEFAULT_PROJECT, 1, UnitOfWork.DEFAULT_EPOCH_ID);
 
diff --git a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java
index d08373b613..c9c52fc563 100644
--- a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java
+++ b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java
@@ -387,7 +387,7 @@ public class JobErrorTest extends NLocalFileMetadataTestCase {
         manager.addJob(executable);
 
         var output = manager.getOutput(executable.getId());
-        final long[] duration = {AbstractExecutable.getDuration(output)};
+        final long[] duration = { AbstractExecutable.getDuration(output) };
         Assert.assertEquals(0, duration[0]);
 
         ((DefaultOutput) output).setStartTime(System.currentTimeMillis());
@@ -442,6 +442,7 @@ public class JobErrorTest extends NLocalFileMetadataTestCase {
         manager.updateStageStatus(logicStep1.getId(), null, ExecutableState.RUNNING, null, null);
         manager.updateStageStatus(logicStep2.getId(), null, ExecutableState.RUNNING, null, null);
         manager.updateStageStatus(logicStep3.getId(), null, ExecutableState.RUNNING, null, null);
+        manager.saveUpdatedJob();
 
         val durationWithoutWaiteTime = executable.getDurationFromStepOrStageDurationSum();
 
@@ -487,6 +488,7 @@ public class JobErrorTest extends NLocalFileMetadataTestCase {
         manager.updateStageStatus(logicStep1.getId(), null, ExecutableState.RUNNING, null, null);
         manager.updateStageStatus(logicStep2.getId(), null, ExecutableState.RUNNING, null, null);
         manager.updateStageStatus(logicStep3.getId(), null, ExecutableState.RUNNING, null, null);
+        manager.saveUpdatedJob();
 
         val durationWithoutWaiteTime = executable.getDurationFromStepOrStageDurationSum();
 
diff --git a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
index e29d23a875..ee6db5a9be 100644
--- a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
+++ b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
@@ -578,6 +578,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SKIP, null, "test output");
         manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
+        manager.saveUpdatedJob();
 
         var buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
@@ -587,6 +588,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
         manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
+        manager.saveUpdatedJob();
 
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
@@ -596,6 +598,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         Map<String, String> info = Maps.newHashMap();
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "1");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -603,6 +606,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
 
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "8");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -610,6 +614,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
 
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "10");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -617,18 +622,21 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
 
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "12");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
         assertTrue(1 == successLogicStep);
 
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.RUNNING, null, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
         assertTrue(1 == successLogicStep);
 
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -665,6 +673,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SKIP, null, "test output");
         manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
+        manager.saveUpdatedJob();
 
         var buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
@@ -674,6 +683,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
         manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
+        manager.saveUpdatedJob();
 
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
@@ -683,6 +693,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         Map<String, String> info = Maps.newHashMap();
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "1");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -690,6 +701,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
 
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "10");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -697,6 +709,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
 
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "10");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -704,18 +717,21 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
 
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "12");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
         assertTrue(0.5 == successLogicStep);
 
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.RUNNING, null, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
         assertTrue(0.5 == successLogicStep);
 
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -751,6 +767,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SKIP, null, "test output");
         manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
+        manager.saveUpdatedJob();
 
         var buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap().get(segmentId);
@@ -760,6 +777,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
         manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
+        manager.saveUpdatedJob();
 
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap().get(segmentId);
@@ -769,6 +787,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         Map<String, String> info = Maps.newHashMap();
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "1");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap().get(segmentId);
         successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true);
@@ -778,6 +797,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
 
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "8");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap().get(segmentId);
         successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true);
@@ -787,6 +807,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
 
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "10");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap().get(segmentId);
         successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true);
@@ -796,6 +817,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
 
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "12");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap().get(segmentId);
         successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true);
@@ -804,6 +826,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         assertTrue(1 == successLogicStep);
 
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.RUNNING, null, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap().get(segmentId);
         successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true);
@@ -812,6 +835,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         assertTrue(1 == successLogicStep);
 
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap().get(segmentId);
         successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true);
@@ -863,6 +887,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         assertEquals(logicStep.getId(), logicStepBase.getId());
 
         manager.updateStageStatus(logicStep.getId(), segmentId, ExecutableState.RUNNING, null, "test output");
+        manager.saveUpdatedJob();
 
         List<ExecutableStepResponse> jobDetail = jobService.getJobDetail(project, executable.getId());
         assertEquals(1, jobDetail.size());
@@ -883,8 +908,8 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         assertTrue(logicStepResponse2.getExecStartTime() < System.currentTimeMillis());
 
         manager.updateStageStatus(logicStep.getId(), segmentId2, ExecutableState.RUNNING, null, "test output");
-
         manager.updateStageStatus(logicStep.getId(), null, ExecutableState.SUCCEED, null, "test output");
+        manager.saveUpdatedJob();
 
         jobDetail = jobService.getJobDetail(project, executable.getId());
         assertEquals(1, jobDetail.size());
diff --git a/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java b/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java
index c030785854..bf63baa1e2 100644
--- a/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java
+++ b/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.rest.service;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.kylin.engine.spark.job.step.NStageForBuild;
@@ -60,7 +61,9 @@ import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.test.util.ReflectionTestUtils;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -204,6 +207,7 @@ public class StageTest extends NLocalFileMetadataTestCase {
         manager.addJob(executable);
 
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, null, null, false);
+        manager.saveUpdatedJob();
 
         manager.updateStagePaused(executable);
 
@@ -256,6 +260,7 @@ public class StageTest extends NLocalFileMetadataTestCase {
 
         manager.updateJobOutput(executable.getId(), ExecutableState.RUNNING, null, null, null);
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, null, null, false);
+        manager.saveUpdatedJob();
         manager.updateJobOutput(executable.getId(), ExecutableState.SUCCEED, null, null, null);
         manager.makeStageSuccess(sparkExecutable.getId());
 
@@ -295,12 +300,14 @@ public class StageTest extends NLocalFileMetadataTestCase {
         manager.addJob(executable);
 
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, null, errMsg);
+        manager.saveUpdatedJob();
         var output1 = manager.getOutput(logicStep1.getId(), segmentId);
         Assert.assertEquals(ExecutableState.SUCCEED, output1.getState());
         Assert.assertEquals(output1.getShortErrMsg(), errMsg);
         Assert.assertTrue(MapUtils.isEmpty(output1.getExtra()));
 
         manager.updateStageStatus(logicStep1.getId(), null, ExecutableState.ERROR, null, errMsg);
+        manager.saveUpdatedJob();
         output1 = manager.getOutput(logicStep1.getId(), segmentId);
         Assert.assertEquals(ExecutableState.SUCCEED, output1.getState());
         Assert.assertEquals(output1.getShortErrMsg(), errMsg);
@@ -316,6 +323,45 @@ public class StageTest extends NLocalFileMetadataTestCase {
         Assert.assertTrue(MapUtils.isEmpty(outputLogicStep2.getExtra()));
     }
 
+    @Test
+    public void testUpdateStageStatusNoSaveCache() {
+        val segmentId = RandomUtil.randomUUIDStr();
+        val segmentId2 = RandomUtil.randomUUIDStr();
+
+        val manager = NExecutableManager.getInstance(jobService.getConfig(), getProject());
+        val executable = new SucceedChainedTestExecutable();
+
+        executable.setId(RandomUtil.randomUUIDStr());
+
+        val sparkExecutable = new NSparkExecutable();
+        sparkExecutable.setParam(NBatchConstants.P_SEGMENT_IDS, segmentId + "," + segmentId2);
+        sparkExecutable.setId(RandomUtil.randomUUIDStr());
+        executable.addTask(sparkExecutable);
+
+        val build1 = new NStageForBuild();
+        val build2 = new NStageForBuild();
+        val build3 = new NStageForBuild();
+        sparkExecutable.addStage(build1);
+        sparkExecutable.addStage(build2);
+        sparkExecutable.addStage(build3);
+        sparkExecutable.setStageMap();
+
+        manager.addJob(executable);
+
+        List<AbstractExecutable> tasks = executable.getTasks();
+        tasks.forEach(task -> {
+            final Map<String, List<StageBase>> tasksMap = ((ChainedStageExecutable) task).getStagesMap();
+            for (Map.Entry<String, List<StageBase>> entry : tasksMap.entrySet()) {
+                Optional.ofNullable(entry.getValue()).orElse(Lists.newArrayList())//
+                        .forEach(stage -> //
+                manager.updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.DISCARDED, null, null));
+            }
+            manager.saveUpdatedJob();
+        });
+
+        Assert.assertEquals(1, manager.getAllJobs().get(0).getMvcc());
+    }
+
     @Test
     public void testSetStageOutput() {
         NExecutableManager manager = NExecutableManager.getInstance(jobService.getConfig(), getProject());
@@ -445,6 +491,7 @@ public class StageTest extends NLocalFileMetadataTestCase {
         manager.updateStageStatus(stage1.getId(), segmentId, ExecutableState.RUNNING, null, "test output");
         manager.updateStageStatus(stage2.getId(), segmentId, ExecutableState.RUNNING, null, "test output");
         manager.updateStageStatus(stage3.getId(), segmentId, ExecutableState.RUNNING, null, "test output");
+        manager.saveUpdatedJob();
 
         manager.makeStageError(executable.getId());
         var job = manager.getJob(executable.getId());
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 5517f3b2c4..a842c926ca 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -206,8 +206,9 @@ public class NSparkExecutable extends AbstractExecutable implements ChainedStage
     public void waiteForResourceStart(ExecutableContext context) {
         // mark waiteForResource stage start
         EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
-            getExecutableManager(getProject()) //
-                    .updateStageStatus(getId() + "_00", null, ExecutableState.RUNNING, null, null);
+            NExecutableManager manager = getExecutableManager(getProject());
+            manager.updateStageStatus(getId() + "_00", null, ExecutableState.RUNNING, null, null);
+            manager.saveUpdatedJob();
             return 0;
         }, project, UnitOfWork.DEFAULT_MAX_RETRY, context.getEpochId(), getTempLockName());
     }


[kylin] 08/15: KYLIN-5354 change 'server.max-http-header-size' from 5MB to 32KB

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit b7fff5be2ac0fa10688d378cfd73a0c984fb2662
Author: Yaguang Jia <ji...@foxmail.com>
AuthorDate: Wed Oct 26 10:38:38 2022 +0800

    KYLIN-5354 change 'server.max-http-header-size' from 5MB to 32KB
    
    from 39341
    
    Co-authored-by: Yaguang Jia <ji...@foxmail.com>
---
 src/common-booter/src/main/resources/application.yaml       | 2 +-
 src/data-loading-booter/src/main/resources/application.yaml | 2 +-
 src/query-booter/src/main/resources/application.yaml        | 2 +-
 src/server/src/main/resources/application.yaml              | 2 +-
 4 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/src/common-booter/src/main/resources/application.yaml b/src/common-booter/src/main/resources/application.yaml
index 0be4da873f..a0e67ed70c 100644
--- a/src/common-booter/src/main/resources/application.yaml
+++ b/src/common-booter/src/main/resources/application.yaml
@@ -30,7 +30,7 @@ server:
     context-path: "/kylin"
     encoding:
       charset: UTF-8
-  max-http-header-size: 5242880
+  max-http-header-size: 32KB
 
 management:
   endpoint:
diff --git a/src/data-loading-booter/src/main/resources/application.yaml b/src/data-loading-booter/src/main/resources/application.yaml
index 44858a5e2a..738a966765 100644
--- a/src/data-loading-booter/src/main/resources/application.yaml
+++ b/src/data-loading-booter/src/main/resources/application.yaml
@@ -30,7 +30,7 @@ server:
     context-path: "/kylin"
     encoding:
       charset: UTF-8
-  max-http-header-size: 5242880
+  max-http-header-size: 32KB
 
 management:
   endpoint:
diff --git a/src/query-booter/src/main/resources/application.yaml b/src/query-booter/src/main/resources/application.yaml
index 31526e0f2f..444b7015c8 100644
--- a/src/query-booter/src/main/resources/application.yaml
+++ b/src/query-booter/src/main/resources/application.yaml
@@ -30,7 +30,7 @@ server:
     context-path: "/kylin"
     encoding:
       charset: UTF-8
-  max-http-header-size: 5242880
+  max-http-header-size: 32KB
 
 management:
   endpoint:
diff --git a/src/server/src/main/resources/application.yaml b/src/server/src/main/resources/application.yaml
index 4fe508de1b..c9d3b74e5a 100644
--- a/src/server/src/main/resources/application.yaml
+++ b/src/server/src/main/resources/application.yaml
@@ -31,7 +31,7 @@ server:
     context-path: "/kylin"
     encoding:
       charset: UTF-8
-  max-http-header-size: 5242880
+  max-http-header-size: 32KB
 
 management:
   endpoint:


[kylin] 05/15: KYLIN-5351 minor fix of KYLIN-5333

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 21c31f4556853d16a8cb9336e7887f2afeaae14b
Author: Hang Jia <75...@qq.com>
AuthorDate: Mon Oct 24 16:17:22 2022 +0800

    KYLIN-5351 minor fix of KYLIN-5333
    
    Co-authored-by: Hang Jia <75...@qq.com>
---
 .../java/org/apache/kylin/rest/controller/v2/JobControllerV2.java     | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/v2/JobControllerV2.java b/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/v2/JobControllerV2.java
index 414563adf8..fae3e21e61 100644
--- a/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/v2/JobControllerV2.java
+++ b/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/v2/JobControllerV2.java
@@ -90,6 +90,10 @@ public class JobControllerV2 extends BaseController {
             @RequestParam(value = "sortBy", required = false, defaultValue = "last_modified") String sortBy,
             @RequestParam(value = "sortby", required = false) String sortby, //param for 3x
             @RequestParam(value = "reverse", required = false, defaultValue = "true") Boolean reverse) {
+        // 3x default last_modify
+        if (!StringUtils.isEmpty(sortby) && !"last_modify".equals(sortby)) {
+            sortBy = sortby;
+        }
         checkNonNegativeIntegerArg("pageOffset", pageOffset);
         checkNonNegativeIntegerArg("pageSize", pageSize);
         List<String> statuses = Lists.newArrayList();