You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/11/12 03:47:05 UTC

[incubator-doris] branch branch-0.15 updated (4309efe -> fe8f299)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a change to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git.


    from 4309efe  [Cherry-pick] Add /backends HTTP api for spark/flink connector
     new cecd369  [libhdfs] Add errno for hdfs writer. when no dir, hdfs writer open failed, the dir need to be created. (#7050)
     new 42c93ff  [RoutineLoad] And "runningTxns" fields in SHOW ROUTINE LOAD result (#6986)
     new 0ee3485  [Colocate] Fix bug that colocate group can not be redistributed after dropping a backend (#7020)
     new 8b8d21c  [Compile] Fix spark-connector compile problem (#7048)
     new aea87ca  [Bug] Fix bug that NPE thrown when adding partition for table with MV (#7069)
     new c53c006  [Bug] Fix NumberFormatException for partition cache (#6846)
     new b1bc9f8  [Feature] Clean up old sync jobs regularly (#7061)
     new 130ed35  [BUG] Fix CacheAnalyzer's bug when aggregate column contains expression. (#7085)
     new fe8f299  [Bug] Fix bug with use tableId to get table in publish version (#7091)

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/hdfs_file_reader.cpp                   | 16 +++++----
 be/src/exec/hdfs_writer.cpp                        | 32 ++++++++++++++++--
 .../administrator-guide/operation/multi-tenant.md  |  6 ++--
 docs/en/installing/upgrade.md                      | 19 +++++++++++
 .../administrator-guide/operation/multi-tenant.md  |  6 ++--
 docs/zh-CN/installing/upgrade.md                   | 19 +++++++++++
 extension/spark-doris-connector/build.sh           |  1 +
 .../java/org/apache/doris/analysis/SlotRef.java    |  1 +
 .../java/org/apache/doris/catalog/Catalog.java     |  2 +-
 .../apache/doris/catalog/ColocateTableIndex.java   |  8 +++--
 .../main/java/org/apache/doris/catalog/Column.java | 16 ++++-----
 .../java/org/apache/doris/catalog/Database.java    | 16 +++++++++
 .../doris/catalog/MaterializedIndexMeta.java       |  1 -
 .../clone/ColocateTableCheckerAndBalancer.java     | 20 +++++++-----
 .../apache/doris/common/proc/ReplicasProcNode.java |  2 +-
 .../doris/http/meta/ColocateMetaService.java       |  2 +-
 .../doris/httpv2/meta/ColocateMetaService.java     |  2 +-
 .../doris/load/routineload/RoutineLoadJob.java     |  2 ++
 .../load/routineload/RoutineLoadStatistic.java     |  7 ++++
 .../load/routineload/RoutineLoadTaskInfo.java      |  1 +
 .../org/apache/doris/load/sync/SyncChecker.java    |  8 ++++-
 .../java/org/apache/doris/load/sync/SyncJob.java   | 17 ++++++++++
 .../org/apache/doris/load/sync/SyncJobManager.java | 38 ++++++++++++++++++++++
 .../org/apache/doris/qe/cache/CacheAnalyzer.java   |  4 +++
 .../org/apache/doris/qe/cache/PartitionRange.java  | 30 ++++++++++-------
 .../main/java/org/apache/doris/system/Backend.java |  3 ++
 .../org/apache/doris/task/CreateRollupTask.java    |  1 +
 .../org/apache/doris/task/SchemaChangeTask.java    |  1 +
 .../doris/transaction/DatabaseTransactionMgr.java  |  2 +-
 .../apache/doris/load/sync/SyncJobManagerTest.java | 36 ++++++++++++++++++++
 30 files changed, 268 insertions(+), 51 deletions(-)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 06/09: [Bug] Fix NumberFormatException for partition cache (#6846)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit c53c00635b72edc5522a83ed5f4411f5ce5a83d8
Author: ChPi <ch...@gmail.com>
AuthorDate: Fri Nov 12 10:36:58 2021 +0800

    [Bug] Fix NumberFormatException for partition cache (#6846)
    
    Fix #6845
---
 .../org/apache/doris/qe/cache/PartitionRange.java  | 30 ++++++++++++++--------
 1 file changed, 19 insertions(+), 11 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionRange.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionRange.java
index 91e393e..5f815bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionRange.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionRange.java
@@ -27,7 +27,6 @@ import org.apache.doris.analysis.LiteralExpr;
 import org.apache.doris.analysis.IntLiteral;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.PartitionItem;
-import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.RangePartitionInfo;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Partition;
@@ -149,17 +148,26 @@ public class PartitionRange {
         public Date date;
 
         public boolean init(Type type, String str) {
-            if (type.getPrimitiveType() == PrimitiveType.DATE) {
-                try {
-                    date = df10.parse(str);
-                } catch (Exception e) {
-                    LOG.warn("parse error str{}.", str);
+            switch (type.getPrimitiveType()) {
+                case DATE:
+                    try {
+                        date = df10.parse(str);
+                    } catch (Exception e) {
+                        LOG.warn("parse error str{}.", str);
+                        return false;
+                    }
+                    keyType = KeyType.DATE;
+                    break;
+                case TINYINT:
+                case SMALLINT:
+                case INT:
+                case BIGINT:
+                    value = Long.parseLong(str);
+                    keyType = KeyType.LONG;
+                    break;
+                default:
+                    LOG.info("PartitionCache not support such key type {}", type.toSql());
                     return false;
-                }
-                keyType = KeyType.DATE;
-            } else {
-                value = Long.valueOf(str);
-                keyType = KeyType.LONG;
             }
             return true;
         }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 03/09: [Colocate] Fix bug that colocate group can not be redistributed after dropping a backend (#7020)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 0ee3485c8a09f6258106d9ed097ada33d37b40e1
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Thu Nov 11 15:41:49 2021 +0800

    [Colocate] Fix bug that colocate group can not be redistributed after dropping a backend (#7020)
    
    Mainly changes:
    
    1. Fix [Bug] Colocate group can not redistributed after dropping a backend #7019
    2. Add detail msg about why a colocate group is unstable.
    3. Add more suggestion when upgrading Doris cluster.
---
 .../en/administrator-guide/operation/multi-tenant.md |  6 +++---
 docs/en/installing/upgrade.md                        | 19 +++++++++++++++++++
 .../administrator-guide/operation/multi-tenant.md    |  6 +++---
 docs/zh-CN/installing/upgrade.md                     | 19 +++++++++++++++++++
 .../main/java/org/apache/doris/catalog/Catalog.java  |  2 +-
 .../org/apache/doris/catalog/ColocateTableIndex.java |  8 ++++++--
 .../doris/clone/ColocateTableCheckerAndBalancer.java | 20 ++++++++++++--------
 .../apache/doris/common/proc/ReplicasProcNode.java   |  2 +-
 .../apache/doris/http/meta/ColocateMetaService.java  |  2 +-
 .../doris/httpv2/meta/ColocateMetaService.java       |  2 +-
 .../main/java/org/apache/doris/system/Backend.java   |  3 +++
 11 files changed, 69 insertions(+), 20 deletions(-)

diff --git a/docs/en/administrator-guide/operation/multi-tenant.md b/docs/en/administrator-guide/operation/multi-tenant.md
index bb2a06a..b47a523 100644
--- a/docs/en/administrator-guide/operation/multi-tenant.md
+++ b/docs/en/administrator-guide/operation/multi-tenant.md
@@ -1,6 +1,6 @@
 ---
 {
-    "title": "Multi-tenancy(Experimental)",
+    "title": "Multi-tenancy(Deprecated)",
     "language": "en"
 }
 ---
@@ -24,9 +24,9 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Multi-tenancy(Experimental)
+# Multi-tenancy(Deprecated)
 
-This function is experimental and is not recommended for use in production environment.
+This function is deprecated. Please see [Multi-Tenant](../multi-tenant.md).
 
 ## Background
 Doris, as a PB-level online report and multi-dimensional analysis database, provides cloud-based database services through open cloud, and deploys a physical cluster for each client in the cloud. Internally, a physical cluster deploys multiple services, and separately builds clusters for services with high isolation requirements. In view of the above problems:
diff --git a/docs/en/installing/upgrade.md b/docs/en/installing/upgrade.md
index be2cde0..47b3118 100644
--- a/docs/en/installing/upgrade.md
+++ b/docs/en/installing/upgrade.md
@@ -32,6 +32,25 @@ Doris can upgrade smoothly by rolling upgrades. The following steps are recommen
 > Note:
 > 1. The following approaches are based on highly available deployments. That is, data 3 replicas, FE high availability.
 
+## Preparen
+
+1. Turn off the replica repair and balance operation.
+
+     There will be node restarts during the upgrade process, so unnecessary cluster balancing and replica repair logic may be triggered. You can close it first with the following command:
+
+     ```
+     # Turn off the replica ealance logic. After it is closed, the balancing operation of the ordinary table replica will no longer be triggered.
+     $ mysql-client> admin set frontend config("disable_balance" = "true");
+
+     # Turn off the replica balance logic of the colocation table. After it is closed, the replica redistribution operation of the colocation table will no longer be triggered.
+     $ mysql-client> admin set frontend config("disable_colocate_balance");
+
+     # Turn off the replica scheduling logic. After shutting down, all generated replica repair and balancing tasks will no longer be scheduled.
+     $ mysql-client> admin set frontend config("disable_tablet_scheduler" = "true");
+     ```
+
+     After the cluster is upgraded, just use the above command to set the corresponding configuration to the original value.
+
 ## Test the correctness of BE upgrade
 
 1. Arbitrarily select a BE node and deploy the latest palo_be binary file.
diff --git a/docs/zh-CN/administrator-guide/operation/multi-tenant.md b/docs/zh-CN/administrator-guide/operation/multi-tenant.md
index 176d600..dc818cd 100644
--- a/docs/zh-CN/administrator-guide/operation/multi-tenant.md
+++ b/docs/zh-CN/administrator-guide/operation/multi-tenant.md
@@ -1,6 +1,6 @@
 ---
 {
-    "title": "多租户(Experimental)",
+    "title": "多租户(弃用)",
     "language": "zh-CN"
 }
 ---
@@ -24,9 +24,9 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# 多租户(Experimental)
+# 多租户(已弃用)
 
-该功能为实验性质,暂不建议在生产环境使用。
+该功能已弃用。新方案请参阅:[多租户和资源划分](../multi-tenant.md)。
 
 ## 背景
 Doris 作为一款 PB 级别的在线报表与多维分析数据库,对外通过开放云提供云端的数据库服务,并且对于每个云上的客户都单独部署了一套物理集群。对内,一套物理集群部署了多个业务,对于隔离性要求比较高的业务单独搭建了集群。针对以上存在几点问题:
diff --git a/docs/zh-CN/installing/upgrade.md b/docs/zh-CN/installing/upgrade.md
index 3315a06..e6d934d 100644
--- a/docs/zh-CN/installing/upgrade.md
+++ b/docs/zh-CN/installing/upgrade.md
@@ -31,6 +31,25 @@ Doris 可以通过滚动升级的方式,平滑进行升级。建议按照以
 > 注:  
 > 1. 以下方式均建立在高可用部署的情况下。即数据 3 副本,FE 高可用情况下。  
 
+## 前置工作
+
+1. 关闭集群副本修复和均衡功能
+
+	升级过程中会有节点重启,所以可能会触发不必要的集群均衡和副本修复逻辑。可以先通过以下命令关闭:
+
+	```
+	# 关闭副本均衡逻辑。关闭后,不会再触发普通表副本的均衡操作。
+	$ mysql-client > admin set frontend config("disable_balance" = "true");
+
+	# 关闭 colocation 表的副本均衡逻辑。关闭后,不会再出发 colocation 表的副本重分布操作。
+	$ mysql-client > admin set frontend config("disable_colocate_balance");
+
+	# 关闭副本调度逻辑。关闭后,所有已产生的副本修复和均衡任务不会再被调度。
+	$ mysql-client > admin set frontend config("disable_tablet_scheduler" = "true");
+	```
+
+	当集群升级完毕后,在通过以上命令将对应配置设为原值即可。
+
 ## 测试 BE 升级正确性
 
 1. 任意选择一个 BE 节点,部署最新的 palo_be 二进制文件。
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 1c10990..485d225 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -5379,7 +5379,7 @@ public class Catalog {
             }
 
             // set this group as unstable
-            colocateTableIndex.markGroupUnstable(groupId, false /* edit log is along with modify table log */);
+            colocateTableIndex.markGroupUnstable(groupId, "Colocation group modified by user", false /* edit log is along with modify table log */);
             table.setColocateGroup(colocateGroup);
         } else {
             // unset colocation group
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
index 0aca4ac..88d221b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
@@ -177,6 +177,7 @@ public class ColocateTableIndex implements Writable {
                         tbl.getDefaultReplicaAllocation());
                 groupName2Id.put(fullGroupName, groupId);
                 group2Schema.put(groupId, groupSchema);
+                group2ErrMsgs.put(groupId, "");
             }
             group2Tables.put(groupId, tbl.getId());
             table2Group.put(tbl.getId(), groupId);
@@ -206,13 +207,14 @@ public class ColocateTableIndex implements Writable {
         }
     }
 
-    public void markGroupUnstable(GroupId groupId, boolean needEditLog) {
+    public void markGroupUnstable(GroupId groupId, String reason, boolean needEditLog) {
         writeLock();
         try {
             if (!group2Tables.containsKey(groupId)) {
                 return;
             }
             if (unstableGroups.add(groupId)) {
+                group2ErrMsgs.put(groupId, Strings.nullToEmpty(reason));
                 if (needEditLog) {
                     ColocatePersistInfo info = ColocatePersistInfo.createForMarkUnstable(groupId);
                     Catalog.getCurrentCatalog().getEditLog().logColocateMarkUnstable(info);
@@ -231,6 +233,7 @@ public class ColocateTableIndex implements Writable {
                 return;
             }
             if (unstableGroups.remove(groupId)) {
+                group2ErrMsgs.put(groupId, "");
                 if (needEditLog) {
                     ColocatePersistInfo info = ColocatePersistInfo.createForMarkStable(groupId);
                     Catalog.getCurrentCatalog().getEditLog().logColocateMarkStable(info);
@@ -255,6 +258,7 @@ public class ColocateTableIndex implements Writable {
                 // all tables of this group are removed, remove the group
                 group2BackendsPerBucketSeq.rowMap().remove(groupId);
                 group2Schema.remove(groupId);
+                group2ErrMsgs.remove(groupId);
                 unstableGroups.remove(groupId);
                 String fullGroupName = null;
                 for (Map.Entry<String, GroupId> entry : groupName2Id.entrySet()) {
@@ -537,7 +541,7 @@ public class ColocateTableIndex implements Writable {
     }
 
     public void replayMarkGroupUnstable(ColocatePersistInfo info) {
-        markGroupUnstable(info.getGroupId(), false);
+        markGroupUnstable(info.getGroupId(), "replay mark group unstable", false);
     }
 
     public void replayMarkGroupStable(ColocatePersistInfo info) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
index b0d06d1..14676d2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
@@ -48,6 +48,7 @@ import com.google.common.collect.Table;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.parquet.Strings;
 
 import java.util.List;
 import java.util.Map;
@@ -161,6 +162,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
                 Catalog.getCurrentSystemInfo().checkReplicaAllocation(db.getClusterName(), replicaAlloc);
             } catch (DdlException e) {
                 colocateIndex.setErrMsgForGroup(groupId, e.getMessage());
+                continue;
             }
             Map<Tag, Short> allocMap = replicaAlloc.getAllocMap();
 
@@ -220,7 +222,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
                 continue;
             }
 
-            boolean isGroupStable = true;
+            String unstableReason = null;
             OUT: for (Long tableId : tableIds) {
                 OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
                 if (olapTable == null || !colocateIndex.isColocateTable(olapTable.getId())) {
@@ -244,8 +246,8 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
                                 Tablet tablet = index.getTablet(tabletId);
                                 TabletStatus st = tablet.getColocateHealthStatus(visibleVersion, replicaAlloc, bucketsSeq);
                                 if (st != TabletStatus.HEALTHY) {
-                                    isGroupStable = false;
-                                    LOG.debug("get unhealthy tablet {} in colocate table. status: {}", tablet.getId(), st);
+                                    unstableReason = String.format("get unhealthy tablet %d in colocate table. status: %s", tablet.getId(), st);
+                                    LOG.debug(unstableReason);
 
                                     if (!tablet.readyToBeRepaired(Priority.HIGH)) {
                                         continue;
@@ -279,10 +281,10 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
             } // end for tables
 
             // mark group as stable or unstable
-            if (isGroupStable) {
+            if (Strings.isNullOrEmpty(unstableReason)) {
                 colocateIndex.markGroupStable(groupId, true);
             } else {
-                colocateIndex.markGroupUnstable(groupId, true);
+                colocateIndex.markGroupUnstable(groupId, unstableReason, true);
             }
         } // end for groups
     }
@@ -470,10 +472,12 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
             for (Long beId : backendIds) {
                 Backend be = infoService.getBackend(beId);
                 if (be == null) {
-                    LOG.info("backend {} does not exist", beId);
-                    return null;
+                    // For non-exist BE(maybe dropped), add a ip 0.0.0.0
+                    // And the following logic will handle the non-exist host.
+                    hosts.add(Backend.DUMMY_IP);
+                } else {
+                    hosts.add(be.getHost());
                 }
-                hosts.add(be.getHost());
             }
             hostsPerBucketSeq.add(hosts);
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java
index a8d117d..eabce8d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java
@@ -57,7 +57,7 @@ public class ReplicasProcNode implements ProcNodeInterface {
         result.setNames(TITLE_NAMES);
         for (Replica replica : replicas) {
             Backend be = backendMap.get(replica.getBackendId());
-            String host = (be == null ? "0.0.0.0" : be.getHost());
+            String host = (be == null ? Backend.DUMMY_IP : be.getHost());
             int port = (be == null ? 0 : be.getHttpPort());
             String metaUrl = String.format("http://%s:%d/api/meta/header/%d/%d",
                     host, port,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/http/meta/ColocateMetaService.java b/fe/fe-core/src/main/java/org/apache/doris/http/meta/ColocateMetaService.java
index 777586e..42b713d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/http/meta/ColocateMetaService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/http/meta/ColocateMetaService.java
@@ -147,7 +147,7 @@ public class ColocateMetaService {
 
             HttpMethod method = request.getRequest().method();
             if (method.equals(HttpMethod.POST)) {
-                colocateIndex.markGroupUnstable(groupId, true);
+                colocateIndex.markGroupUnstable(groupId, "mark unstable via http api", true);
             } else if (method.equals(HttpMethod.DELETE)) {
                 colocateIndex.markGroupStable(groupId, true);
             } else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/ColocateMetaService.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/ColocateMetaService.java
index b9c75ff..8fc22ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/ColocateMetaService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/ColocateMetaService.java
@@ -110,7 +110,7 @@ public class ColocateMetaService extends RestBaseController {
 
         String method = request.getMethod();
         if ("POST".equalsIgnoreCase(method)) {
-            colocateIndex.markGroupUnstable(groupId, true);
+            colocateIndex.markGroupUnstable(groupId, "mark unstable via http api", true);
         } else if ("DELETE".equalsIgnoreCase(method)) {
             colocateIndex.markGroupStable(groupId, true);
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index 03fd69a..0c740b4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -53,6 +53,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class Backend implements Writable {
 
+    // Represent a meaningless IP
+    public static final String DUMMY_IP = "0.0.0.0";
+
     public enum BackendState {
         using, /* backend is belong to a cluster*/
         offline,

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 04/09: [Compile] Fix spark-connector compile problem (#7048)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 8b8d21cc5ea0bd9a04c59c06df99db1c17e9edf5
Author: wei zhao <zh...@163.com>
AuthorDate: Thu Nov 11 15:42:30 2021 +0800

    [Compile] Fix spark-connector compile problem (#7048)
    
    Use `thrift` in thirdparty
---
 extension/spark-doris-connector/build.sh | 1 +
 1 file changed, 1 insertion(+)

diff --git a/extension/spark-doris-connector/build.sh b/extension/spark-doris-connector/build.sh
index b4ea042..3ba6333 100755
--- a/extension/spark-doris-connector/build.sh
+++ b/extension/spark-doris-connector/build.sh
@@ -30,6 +30,7 @@ ROOT=`cd "$ROOT"; pwd`
 
 
 export DORIS_HOME=${ROOT}/../../
+export PATH=${DORIS_THIRDPARTY}/installed/bin:$PATH
 
 # include custom environment variables
 if [[ -f ${DORIS_HOME}/custom_env.sh ]]; then

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 01/09: [libhdfs] Add errno for hdfs writer. when no dir, hdfs writer open failed, the dir need to be created. (#7050)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit cecd369f3f463aebe295d8b233d319aeb66756c4
Author: pengxiangyu <di...@163.com>
AuthorDate: Thu Nov 11 15:21:21 2021 +0800

    [libhdfs] Add errno for hdfs writer. when no dir, hdfs writer open failed, the dir need to be created. (#7050)
    
    1. Add errno message for hdfs writer failed.
    2. When call openWrite for hdfs, the dir will be created when it doesn't exist,
---
 be/src/exec/hdfs_file_reader.cpp | 16 ++++++++++------
 be/src/exec/hdfs_writer.cpp      | 32 +++++++++++++++++++++++++++++---
 2 files changed, 39 insertions(+), 9 deletions(-)

diff --git a/be/src/exec/hdfs_file_reader.cpp b/be/src/exec/hdfs_file_reader.cpp
index a6f509f..0ba1a3a 100644
--- a/be/src/exec/hdfs_file_reader.cpp
+++ b/be/src/exec/hdfs_file_reader.cpp
@@ -20,6 +20,7 @@
 #include <unistd.h>
 
 #include "common/logging.h"
+#include "service/backend_options.h"
 
 namespace doris {
 
@@ -80,7 +81,8 @@ Status HdfsFileReader::open() {
     _hdfs_file = hdfsOpenFile(_hdfs_fs, _path.c_str(), O_RDONLY, 0, 0, 0);
     if (_hdfs_file == nullptr) {
         std::stringstream ss;
-        ss << "open file failed. " << _namenode << _path;
+        ss << "open file failed. " << "(BE: " << BackendOptions::get_localhost() << ")"
+                << _namenode << _path << ", err: " << strerror(errno);;
         return Status::InternalError(ss.str());
     }
     LOG(INFO) << "open file. " << _namenode << _path;
@@ -139,7 +141,8 @@ Status HdfsFileReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_r
         int ret = hdfsSeek(_hdfs_fs, _hdfs_file, position);
         if (ret != 0) { // check fseek return value
             std::stringstream ss;
-            ss << "hdfsSeek failed. " << _namenode << _path;
+            ss << "hdfsSeek failed. " << "(BE: " << BackendOptions::get_localhost() << ")"
+                    << _namenode << _path << ", err: " << strerror(errno);;
             return Status::InternalError(ss.str());
         }
     }
@@ -147,7 +150,8 @@ Status HdfsFileReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_r
     *bytes_read = hdfsRead(_hdfs_fs, _hdfs_file, out, nbytes);
     if (*bytes_read < 0) {
         std::stringstream ss;
-        ss << "Read hdfs file failed. " << _namenode << _path;
+        ss << "Read hdfs file failed. " << "(BE: " << BackendOptions::get_localhost() << ")"
+                << _namenode << _path << ", err: " << strerror(errno);;
         return Status::InternalError(ss.str());
     }
     _current_offset += *bytes_read; // save offset with file
@@ -165,7 +169,7 @@ int64_t HdfsFileReader::size() {
         }
         hdfsFileInfo* file_info = hdfsGetPathInfo(_hdfs_fs, _path.c_str());
         if (file_info == nullptr) {
-            LOG(WARNING) << "get path info failed: " << _namenode << _path;
+            LOG(WARNING) << "get path info failed: " << _namenode << _path << ", err: " << strerror(errno);;
             close();
             return -1;
         }
@@ -183,8 +187,8 @@ Status HdfsFileReader::seek(int64_t position) {
     if (res != 0) {
         char err_buf[64];
         std::stringstream ss;
-        ss << "Seek to offset failed. offset=" << position
-           << ", error=" << strerror_r(errno, err_buf, 64);
+        ss << "Seek to offset failed. " << "(BE: " << BackendOptions::get_localhost() << ")"
+                << " offset=" << position << ", err: " << strerror(errno);
         return Status::InternalError(ss.str());
     }
     return Status::OK();
diff --git a/be/src/exec/hdfs_writer.cpp b/be/src/exec/hdfs_writer.cpp
index a00413f..d608a77 100644
--- a/be/src/exec/hdfs_writer.cpp
+++ b/be/src/exec/hdfs_writer.cpp
@@ -17,7 +17,10 @@
 
 #include "exec/hdfs_writer.h"
 
+#include <filesystem>
+
 #include "common/logging.h"
+#include "service/backend_options.h"
 
 namespace doris {
 const static std::string FS_KEY = "fs.defaultFS";
@@ -47,11 +50,30 @@ Status HDFSWriter::open() {
         // the path already exists
         return Status::AlreadyExist(_path + " already exists.");
     }
+
+    std::filesystem::path hdfs_path(_path);
+    std::string hdfs_dir = hdfs_path.parent_path().string();
+    exists = hdfsExists(_hdfs_fs, hdfs_dir.c_str());
+    if (exists != 0) {
+        LOG(INFO) << "hdfs dir doesn't exist, create it: " << hdfs_dir;
+        int ret = hdfsCreateDirectory(_hdfs_fs, hdfs_dir.c_str());
+        if (ret != 0) {
+            std::stringstream ss;
+            ss << "create dir failed. " << "(BE: " << BackendOptions::get_localhost() << ")"
+                    << " namenode: " << _namenode << " path: " << hdfs_dir
+                    << ", err: " << strerror(errno);
+            LOG(WARNING) << ss.str();
+            return Status::InternalError(ss.str());
+        }
+    }
     // open file
     _hdfs_file = hdfsOpenFile(_hdfs_fs, _path.c_str(), O_WRONLY, 0, 0, 0);
     if (_hdfs_file == nullptr) {
         std::stringstream ss;
-        ss << "open file failed. namenode:" << _namenode << " path:" << _path;
+        ss << "open file failed. " << "(BE: " << BackendOptions::get_localhost() << ")"
+                << " namenode:" << _namenode << " path:" << _path
+                << ", err: " << strerror(errno);
+        LOG(WARNING) << ss.str();
         return Status::InternalError(ss.str());
     }
     LOG(INFO) << "open file. namenode:" << _namenode << " path:" << _path;
@@ -66,7 +88,9 @@ Status HDFSWriter::write(const uint8_t* buf, size_t buf_len, size_t* written_len
     int32_t result = hdfsWrite(_hdfs_fs, _hdfs_file, buf, buf_len);
     if (result < 0) {
         std::stringstream ss;
-        ss << "write file failed. namenode:" << _namenode << " path:" << _path;
+        ss << "write file failed. " << "(BE: " << BackendOptions::get_localhost() << ")"
+                << "namenode:" << _namenode << " path:" << _path
+                << ", err: " << strerror(errno);
         LOG(WARNING) << ss.str();
         return Status::InternalError(ss.str());
     }
@@ -91,7 +115,9 @@ Status HDFSWriter::close() {
     int result = hdfsFlush(_hdfs_fs, _hdfs_file);
     if (result == -1) {
         std::stringstream ss;
-        ss << "failed to flush hdfs file. namenode:" << _namenode << " path:" << _path;
+        ss << "failed to flush hdfs file. " << "(BE: " << BackendOptions::get_localhost() << ")"
+                << "namenode:" << _namenode << " path:" << _path
+                << ", err: " << strerror(errno);
         LOG(WARNING) << ss.str();
         return Status::InternalError(ss.str());
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 05/09: [Bug] Fix bug that NPE thrown when adding partition for table with MV (#7069)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit aea87ca0444c01c7c1244a2084042eab9045c1d4
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Thu Nov 11 15:43:16 2021 +0800

    [Bug] Fix bug that NPE thrown when adding partition for table with MV (#7069)
    
    The `defineExpr` in `Column` must be analyzed before calling its `treeToThrift` method.
    And fro CreateReplicaTask, no need to set `defineExpr` in TColumn.
---
 .../src/main/java/org/apache/doris/analysis/SlotRef.java |  1 +
 .../src/main/java/org/apache/doris/catalog/Column.java   | 16 ++++++++--------
 .../org/apache/doris/catalog/MaterializedIndexMeta.java  |  1 -
 .../java/org/apache/doris/task/CreateRollupTask.java     |  1 +
 .../java/org/apache/doris/task/SchemaChangeTask.java     |  1 +
 5 files changed, 11 insertions(+), 9 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java
index 006beff..e1995c5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java
@@ -404,6 +404,7 @@ public class SlotRef extends Expr {
 
     @Override
     public boolean isNullable() {
+        Preconditions.checkNotNull(desc);
         return desc.getIsNullable();
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
index b875563..2e61df6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
@@ -84,7 +84,7 @@ public class Column implements Writable {
     private List<Column> children;
     // Define expr may exist in two forms, one is analyzed, and the other is not analyzed.
     // Currently, analyzed define expr is only used when creating materialized views, so the define expr in RollupJob must be analyzed.
-    // In other cases, such as define expr in `MaterializedIndexMeta`, it may not be analyzed after being relayed.
+    // In other cases, such as define expr in `MaterializedIndexMeta`, it may not be analyzed after being replayed.
     private Expr defineExpr; // use to define column in materialize view
     @SerializedName(value = "visible")
     private boolean visible;
@@ -344,13 +344,13 @@ public class Column implements Writable {
         tColumn.setVisible(visible);
         tColumn.setChildrenColumn(new ArrayList<>());
         toChildrenThrift(this, tColumn);
-
-        // The define expr does not need to be serialized here for now.
-        // At present, only serialized(analyzed) define expr is directly used when creating a materialized view.
-        // It will not be used here, but through another structure `TAlterMaterializedViewParam`.
-        if (this.defineExpr != null) {
-            tColumn.setDefineExpr(this.defineExpr.treeToThrift());
-        }
+        
+        // ATTN:
+        // Currently, this `toThrift()` method is only used from CreateReplicaTask.
+        // And CreateReplicaTask does not need `defineExpr` field.
+        // The `defineExpr` is only used when creating `TAlterMaterializedViewParam`, which is in `AlterReplicaTask`.
+        // And when creating `TAlterMaterializedViewParam`, the `defineExpr` is certainly analyzed.
+        // If we need to use `defineExpr` and call defineExpr.treeToThrift(), make sure it is analyzed, or NPE will thrown.
         return tColumn;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndexMeta.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndexMeta.java
index 0e6712d..c236554 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndexMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndexMeta.java
@@ -58,7 +58,6 @@ public class MaterializedIndexMeta implements Writable, GsonPostProcessable {
     @SerializedName(value = "keysType")
     private KeysType keysType;
     @SerializedName(value = "defineStmt")
-
     private OriginStatement defineStmt;
 
     public MaterializedIndexMeta(long indexId, List<Column> schema, int schemaVersion, int schemaHash,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CreateRollupTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CreateRollupTask.java
index 68d00cf..a4fb118 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateRollupTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateRollupTask.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+@Deprecated
 public class CreateRollupTask extends AgentTask {
 
     private long baseTableId;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SchemaChangeTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/SchemaChangeTask.java
index 865330c..59616cc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/SchemaChangeTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/SchemaChangeTask.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+@Deprecated
 public class SchemaChangeTask extends AgentTask {
 
     private long baseReplicaId;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 07/09: [Feature] Clean up old sync jobs regularly (#7061)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit b1bc9f878f54e8f40c7599de1c307ae8c1e3bfeb
Author: xy720 <22...@users.noreply.github.com>
AuthorDate: Fri Nov 12 10:53:50 2021 +0800

    [Feature] Clean up old sync jobs regularly (#7061)
    
    #7060
    #6287
    
    Each job that has been stopped for more than 3 days(set with Config.label_keep_max_second)
    will be permanently cleaned up.
---
 .../org/apache/doris/load/sync/SyncChecker.java    |  8 ++++-
 .../java/org/apache/doris/load/sync/SyncJob.java   | 17 ++++++++++
 .../org/apache/doris/load/sync/SyncJobManager.java | 38 ++++++++++++++++++++++
 .../apache/doris/load/sync/SyncJobManagerTest.java | 36 ++++++++++++++++++++
 4 files changed, 98 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java
index 422fe48..8b64f34 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java
@@ -32,7 +32,7 @@ import java.util.List;
 public class SyncChecker extends MasterDaemon {
     private static final Logger LOG = LogManager.getLogger(SyncChecker.class);
 
-    private SyncJobManager syncJobManager;
+    private final SyncJobManager syncJobManager;
 
     public SyncChecker(SyncJobManager syncJobManager) {
         super("sync checker", Config.sync_checker_interval_second * 1000L);
@@ -44,6 +44,7 @@ public class SyncChecker extends MasterDaemon {
         LOG.debug("start check sync jobs.");
         try {
             process();
+            cleanOldSyncJobs();
         } catch (Throwable e) {
             LOG.warn("Failed to process one round of SyncChecker", e);
         }
@@ -74,4 +75,9 @@ public class SyncChecker extends MasterDaemon {
             }
         }
     }
+
+    private void cleanOldSyncJobs() {
+        // clean up expired sync jobs
+        this.syncJobManager.cleanOldSyncJobs();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java
index c0b0321..3565c04 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java
@@ -23,6 +23,7 @@ import org.apache.doris.analysis.CreateDataSyncJobStmt;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Table;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.UserException;
@@ -266,6 +267,18 @@ public abstract class SyncJob implements Writable {
         return "\\N";
     }
 
+    public boolean isExpired(long currentTimeMs) {
+        if (!isCompleted()) {
+            return false;
+        }
+        Preconditions.checkState(finishTimeMs != -1L);
+        long expireTime = Config.label_keep_max_second * 1000L;
+        if ((currentTimeMs - finishTimeMs) > expireTime) {
+            return true;
+        }
+        return false;
+    }
+
     // only use for persist when job state changed
     public static class SyncJobUpdateStateInfo implements Writable {
         @SerializedName(value = "id")
@@ -450,4 +463,8 @@ public abstract class SyncJob implements Writable {
     public List<ChannelDescription> getChannelDescriptions() {
         return this.channelDescriptions;
     }
+
+    public long getFinishTimeMs() {
+        return finishTimeMs;
+    }
 }
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
index 1a0d8b8..5ac5049 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
@@ -41,6 +41,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -279,6 +280,43 @@ public class SyncJobManager implements Writable {
         }
     }
 
+    // Remove old sync jobs. Called periodically.
+    // Stopped jobs will be removed after Config.label_keep_max_second.
+    public void cleanOldSyncJobs() {
+        LOG.debug("begin to clean old sync jobs ");
+        long currentTimeMs = System.currentTimeMillis();
+        writeLock();
+        try {
+            Iterator<Map.Entry<Long, SyncJob>> iterator = idToSyncJob.entrySet().iterator();
+            while (iterator.hasNext()) {
+                SyncJob syncJob = iterator.next().getValue();
+                if (syncJob.isExpired(currentTimeMs)) {
+                    if (!dbIdToJobNameToSyncJobs.containsKey(syncJob.getDbId())) {
+                        continue;
+                    }
+                    Map<String, List<SyncJob>> map = dbIdToJobNameToSyncJobs.get(syncJob.getDbId());
+                    List<SyncJob> list = map.get(syncJob.getJobName());
+                    list.remove(syncJob);
+                    if (list.isEmpty()) {
+                        map.remove(syncJob.getJobName());
+                    }
+                    if (map.isEmpty()) {
+                        dbIdToJobNameToSyncJobs.remove(syncJob.getDbId());
+                    }
+                    iterator.remove();
+                    LOG.info(new LogBuilder(LogKey.SYNC_JOB, syncJob.getId())
+                            .add("finishTimeMs", syncJob.getFinishTimeMs())
+                            .add("currentTimeMs", currentTimeMs)
+                            .add("jobState", syncJob.getJobState())
+                            .add("msg", "old sync job has been cleaned")
+                    );
+                }
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
     public SyncJob getSyncJobById(long jobId) {
         return idToSyncJob.get(jobId);
     }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java
index 8fa080f..457fe43 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java
@@ -405,5 +405,41 @@ public class SyncJobManagerTest {
         Assert.assertEquals(MsgType.USER_CANCEL, canalSyncJob.getFailMsg().getMsgType());
     }
 
+    @Test
+    public void testCleanOldSyncJobs() {
+        SyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+        // change sync job state to cancelled
+        try {
+            canalSyncJob.updateState(JobState.CANCELLED, false);
+        } catch (UserException e) {
+            Assert.fail();
+        }
+        Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState());
+
+        SyncJobManager manager = new SyncJobManager();
+
+        // add a sync job to manager
+        Map<Long, SyncJob> idToSyncJob = Maps.newHashMap();
+        idToSyncJob.put(jobId, canalSyncJob);
+        Map<Long, Map<String, List<SyncJob>>> dbIdToJobNameToSyncJobs = Maps.newHashMap();
+        Map<String, List<SyncJob>> jobNameToSyncJobs = Maps.newHashMap();
+        jobNameToSyncJobs.put(jobName, Lists.newArrayList(canalSyncJob));
+        dbIdToJobNameToSyncJobs.put(dbId, jobNameToSyncJobs);
+
+        Deencapsulation.setField(manager, "idToSyncJob", idToSyncJob);
+        Deencapsulation.setField(manager, "dbIdToJobNameToSyncJobs", dbIdToJobNameToSyncJobs);
+
+        new Expectations(canalSyncJob) {
+            {
+                canalSyncJob.isExpired(anyLong);
+                result = true;
+            }
+        };
+        manager.cleanOldSyncJobs();
+
+        Assert.assertEquals(0, idToSyncJob.size());
+        Assert.assertEquals(0, dbIdToJobNameToSyncJobs.size());
+    }
+
 
 }
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 02/09: [RoutineLoad] And "runningTxns" fields in SHOW ROUTINE LOAD result (#6986)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 42c93ff56bd4c097a178ddcd3757c8f224711704
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Thu Nov 11 15:41:13 2021 +0800

    [RoutineLoad] And "runningTxns" fields in SHOW ROUTINE LOAD result (#6986)
    
    Add a new field `runningTxns` in the result of `SHOW ROUTINE LOAD`. eg:
    
    ```
                      Id: 11001
                    Name: test4
              CreateTime: 2021-11-02 00:04:54
               PauseTime: NULL
                 EndTime: NULL
                  DbName: default_cluster:db1
               TableName: tbl1
                   State: RUNNING
          DataSourceType: KAFKA
          CurrentTaskNum: 1
           JobProperties: {xxx}
        CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"test4"}
               Statistic: {"receivedBytes":6,"runningTxns":[1001, 1002],"errorRows":0,"committedTaskNum":1,"loadedRows":2,"loadRowsRate":0,"abortedTaskNum":13,"errorRowsAfterResumed":0,"totalRows":2,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":20965}
                Progress: {"0":"10"}
    ReasonOfStateChanged:
            ErrorLogUrls:
                OtherMsg:
    ```
    
    So that user can view the status of corresponding transactions of this job by executing `show transaction where id=xx`;
---
 .../java/org/apache/doris/load/routineload/RoutineLoadJob.java     | 2 ++
 .../org/apache/doris/load/routineload/RoutineLoadStatistic.java    | 7 +++++++
 .../org/apache/doris/load/routineload/RoutineLoadTaskInfo.java     | 1 +
 3 files changed, 10 insertions(+)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index b566029..a45b581 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -912,6 +912,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
         
         writeLock();
         try {
+            this.jobStatistic.runningTxnIds.remove(txnState.getTransactionId());
             if (state != JobState.RUNNING) {
                 // job is not running, nothing need to be done
                 return;
@@ -963,6 +964,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
             throws UserException {
         long taskBeId = -1L;
         try {
+            this.jobStatistic.runningTxnIds.remove(txnState.getTransactionId());
             if (txnOperated) {
                 // step0: find task in job
                 Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional = routineLoadTaskInfoList.stream().filter(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java
index c0b3b06..7a5ad3a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java
@@ -22,12 +22,14 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.persist.gson.GsonUtils;
 
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.gson.annotations.SerializedName;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
 
 public class RoutineLoadStatistic implements Writable {
     /*
@@ -62,6 +64,10 @@ public class RoutineLoadStatistic implements Writable {
     @SerializedName(value = "abortedTaskNum")
     public long abortedTaskNum = 0;
 
+    // Save all transactions current running. Including PREPARE, COMMITTED.
+    // No need to persist, only for tracing txn of routine load job.
+    public Set<Long> runningTxnIds = Sets.newHashSet();
+
     @Override
     public void write(DataOutput out) throws IOException {
         String json = GsonUtils.GSON.toJson(this);
@@ -87,6 +93,7 @@ public class RoutineLoadStatistic implements Writable {
                 / this.totalTaskExcutionTimeMs * 1000));
         summary.put("committedTaskNum", Long.valueOf(this.committedTaskNum));
         summary.put("abortedTaskNum", Long.valueOf(this.abortedTaskNum));
+        summary.put("runningTxns", runningTxnIds);
         return summary;
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index b535b94..50acd8b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -189,6 +189,7 @@ public abstract class RoutineLoadTaskInfo {
                     DebugUtil.printId(id), jobId, e);
             throw e;
         }
+        routineLoadJob.jobStatistic.runningTxnIds.add(txnId);
         return true;
     }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 09/09: [Bug] Fix bug with use tableId to get table in publish version (#7091)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit fe8f299c21947f00a8a2302a80df4620909809db
Author: Userwhite <49...@users.noreply.github.com>
AuthorDate: Fri Nov 12 10:56:33 2021 +0800

    [Bug] Fix bug with use tableId to get table in publish version (#7091)
    
    If table has been dropped when finishing txn, skip it.
---
 .../src/main/java/org/apache/doris/catalog/Database.java | 16 ++++++++++++++++
 .../apache/doris/transaction/DatabaseTransactionMgr.java |  2 +-
 2 files changed, 17 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
index cfc89f7..128e9af 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
@@ -406,6 +406,22 @@ public class Database extends MetaObject implements Writable {
         return tableList;
     }
 
+    public List<Table> getTablesOnIdOrderWithIgnoringWrongTableId(List<Long> tableIdList) {
+        List<Table> tableList = Lists.newArrayList();
+        for (Long tableId : tableIdList) {
+            Table table = idToTable.get(tableId);
+            if (table == null) {
+                LOG.warn("unknown table, tableId=" + tableId);
+                continue;
+            }
+            tableList.add(table);
+        }
+        if (tableList.size() > 1) {
+            return tableList.stream().sorted(Comparator.comparing(Table::getId)).collect(Collectors.toList());
+        }
+        return tableList;
+    }
+
     public Set<String> getTableNamesWithLock() {
         readLock();
         try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index cdb73f4..d09fe31 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -717,7 +717,7 @@ public class DatabaseTransactionMgr {
            }
         }
 
-        List<Table> tableList = db.getTablesOnIdOrderOrThrowException(tableIdList);
+        List<Table> tableList = db.getTablesOnIdOrderWithIgnoringWrongTableId(tableIdList);
         MetaLockUtils.writeLockTables(tableList);
         try {
             boolean hasError = false;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 08/09: [BUG] Fix CacheAnalyzer's bug when aggregate column contains expression. (#7085)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 130ed3557b8240d266d4b1e01d7d689e65b4b820
Author: 曹建华 <ca...@bytedance.com>
AuthorDate: Fri Nov 12 10:54:24 2021 +0800

    [BUG] Fix CacheAnalyzer's bug when aggregate column contains expression. (#7085)
    
    When partition_cache is enabled, if Query's aggregate columns contain expression,
    CacheAnalyzer may throw exception and causes the query to fail.
---
 fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
index e651d00..f2c88df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
@@ -399,6 +399,10 @@ public class CacheAnalyzer {
             groupbyCount += 1;
             boolean matched = false;
             for (Expr groupExpr : groupExprs) {
+                if (!(groupExpr instanceof SlotRef)) {
+                    continue;
+                }
+
                 SlotRef slot = (SlotRef) groupExpr;
                 if (partColumn.getName().equals(slot.getColumnName())) {
                     matched = true;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org