You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2023/01/17 07:03:14 UTC

[GitHub] [doris] adonis0147 opened a new pull request, #16006: [fix](MTMV) Refine the process of refreshing data

adonis0147 opened a new pull request, #16006:
URL: https://github.com/apache/doris/pull/16006

   # Proposed changes
   
   1. Remove some redundant code.
   2. Fix the issue with the state of MTMV task.
   3. Fix the case - test_create_mtmv.
   
   ## Problem summary
   
   1. We used a retry policy to re-run the failed MTMV tasks, but we set the state to `FAILURE` during re-running the tasks. We should do this after all the retry runs fail.
   2. There are some redundant code can be removed.
   3. In the case test_create_mtmv, we created many background tasks to refresh the data. Some task may fail due to the concurrency and cause the test fail. Actually, we only need single one task to verify the functionality.
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: 
       - [ ] Yes
       - [ ] No
       - [x] I don't know
   4. Has unit tests been added:
       - [x] Yes
       - [ ] No
       - [ ] No Need
   5. Has document been added or modified:
       - [ ] Yes
       - [ ] No
       - [x] No Need
   6. Does it need to update dependencies:
       - [ ] Yes
       - [x] No
   7. Are there any changes that cannot be rolled back:
       - [ ] Yes (If Yes, please explain WHY)
       - [x] No
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] adonis0147 commented on a diff in pull request #16006: [fix](MTMV) Refine the process of refreshing data

Posted by GitBox <gi...@apache.org>.
adonis0147 commented on code in PR #16006:
URL: https://github.com/apache/doris/pull/16006#discussion_r1071903752


##########
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java:
##########
@@ -53,20 +55,22 @@ public void executeTask(MTMVTaskExecutor taskExecutor) {
                     isSuccess = taskExecutor.executeTask();
                     if (isSuccess) {
                         task.setState(TaskState.SUCCESS);
-                    } else {
-                        task.setState(TaskState.FAILED);
+                        break;
                     }
-                } catch (Exception ex) {
-                    LOG.warn("failed to execute task.", ex);
-                } finally {
-                    task.setFinishTime(MTMVUtils.getNowTimeStamp());
+                } catch (Throwable t) {
+                    LOG.warn("Failed to execute the task, taskId=" + task.getTaskId() + ".", t);
                 }
                 retryTimes--;

Review Comment:
   Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] chenlinzhong commented on a diff in pull request #16006: [fix](MTMV) Refine the process of refreshing data

Posted by GitBox <gi...@apache.org>.
chenlinzhong commented on code in PR #16006:
URL: https://github.com/apache/doris/pull/16006#discussion_r1071919424


##########
fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java:
##########
@@ -137,29 +137,17 @@ public void processDropMaterializedView(DropMaterializedViewStmt stmt) throws Dd
         if (!stmt.isForMTMV() && stmt.getTableName() == null) {
             throw new DdlException("Drop materialized view without table name is unsupported : " + stmt.toSql());
         }
-        TableName tableName = !stmt.isForMTMV() ? stmt.getTableName() : stmt.getMTMVName();
-        Database db;
-        OlapTable olapTable;
-        if (stmt.isIfExists()) {
-            try {
-                String dbName = tableName.getDb();
-                db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
-                String name = tableName.getTbl();
-                olapTable = (OlapTable) db.getTableOrMetaException(name,
-                        !stmt.isForMTMV() ? TableType.OLAP : TableType.MATERIALIZED_VIEW);
-            } catch (Exception e) {
-                LOG.info("db or table not exists, msg={}", e.getMessage());

Review Comment:
   DROP MATERIALIZED VIEW  if exists mv;
   
   if mv not exist can above sql return ok ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] SaintBacchus commented on a diff in pull request #16006: [fix](MTMV) Refine the process of refreshing data

Posted by GitBox <gi...@apache.org>.
SaintBacchus commented on code in PR #16006:
URL: https://github.com/apache/doris/pull/16006#discussion_r1071936581


##########
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java:
##########
@@ -53,20 +57,31 @@ public void executeTask(MTMVTaskExecutor taskExecutor) {
                     isSuccess = taskExecutor.executeTask();
                     if (isSuccess) {
                         task.setState(TaskState.SUCCESS);
-                    } else {
-                        task.setState(TaskState.FAILED);
+                        break;
                     }
-                } catch (Exception ex) {
-                    LOG.warn("failed to execute task.", ex);
-                } finally {
-                    task.setFinishTime(MTMVUtils.getNowTimeStamp());
+                } catch (Throwable t) {
+                    LOG.warn("Failed to execute the task, taskId=" + task.getTaskId() + ".", t);
                 }
                 retryTimes--;
-            } while (!isSuccess && retryTimes >= 0);
+
+                if (retryTimes > 0) {
+                    try {
+                        Thread.sleep(RETRY_INTERVAL);
+                    } catch (InterruptedException e) {
+                        LOG.warn("Failed to sleep.", e);
+                        break;
+                    }
+                }
+            } while (!isSuccess && retryTimes > 0);
             if (!isSuccess) {
-                task.setState(TaskState.FAILED);
+                task.setState(TaskState.FAILURE);
                 task.setErrorCode(-1);
             }
+            task.setFinishTime(MTMVUtils.getNowTimeStamp());
+
+            ChangeMTMVTask changeTask = new ChangeMTMVTask(taskExecutor.getJob().getId(), task, TaskState.RUNNING,
+                    task.getState());
+            Env.getCurrentEnv().getEditLog().logAlterScheduleTask(changeTask);

Review Comment:
   add a try-catch here, othewise it may lose the error in thread



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on pull request #16006: [fix](MTMV) Refine the process of refreshing data

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #16006:
URL: https://github.com/apache/doris/pull/16006#issuecomment-1385570331

   PR approved by at least one committer and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] morningman merged pull request #16006: [fix](MTMV) Refine the process of refreshing data

Posted by GitBox <gi...@apache.org>.
morningman merged PR #16006:
URL: https://github.com/apache/doris/pull/16006


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on pull request #16006: [fix](MTMV) Refine the process of refreshing data

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #16006:
URL: https://github.com/apache/doris/pull/16006#issuecomment-1385570408

   PR approved by anyone and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] adonis0147 commented on a diff in pull request #16006: [fix](MTMV) Refine the process of refreshing data

Posted by GitBox <gi...@apache.org>.
adonis0147 commented on code in PR #16006:
URL: https://github.com/apache/doris/pull/16006#discussion_r1071903491


##########
fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java:
##########
@@ -137,29 +137,17 @@ public void processDropMaterializedView(DropMaterializedViewStmt stmt) throws Dd
         if (!stmt.isForMTMV() && stmt.getTableName() == null) {
             throw new DdlException("Drop materialized view without table name is unsupported : " + stmt.toSql());
         }
-        TableName tableName = !stmt.isForMTMV() ? stmt.getTableName() : stmt.getMTMVName();
-        Database db;
-        OlapTable olapTable;
-        if (stmt.isIfExists()) {
-            try {
-                String dbName = tableName.getDb();
-                db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
-                String name = tableName.getTbl();
-                olapTable = (OlapTable) db.getTableOrMetaException(name,
-                        !stmt.isForMTMV() ? TableType.OLAP : TableType.MATERIALIZED_VIEW);
-            } catch (Exception e) {
-                LOG.info("db or table not exists, msg={}", e.getMessage());

Review Comment:
   > why remove the code ?
   
   Because there are some redundant code, we can simplify them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] chenlinzhong commented on a diff in pull request #16006: [fix](MTMV) Refine the process of refreshing data

Posted by GitBox <gi...@apache.org>.
chenlinzhong commented on code in PR #16006:
URL: https://github.com/apache/doris/pull/16006#discussion_r1071869289


##########
fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java:
##########
@@ -137,29 +137,17 @@ public void processDropMaterializedView(DropMaterializedViewStmt stmt) throws Dd
         if (!stmt.isForMTMV() && stmt.getTableName() == null) {
             throw new DdlException("Drop materialized view without table name is unsupported : " + stmt.toSql());
         }
-        TableName tableName = !stmt.isForMTMV() ? stmt.getTableName() : stmt.getMTMVName();
-        Database db;
-        OlapTable olapTable;
-        if (stmt.isIfExists()) {
-            try {
-                String dbName = tableName.getDb();
-                db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
-                String name = tableName.getTbl();
-                olapTable = (OlapTable) db.getTableOrMetaException(name,
-                        !stmt.isForMTMV() ? TableType.OLAP : TableType.MATERIALIZED_VIEW);
-            } catch (Exception e) {
-                LOG.info("db or table not exists, msg={}", e.getMessage());

Review Comment:
   why remove the code ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] SaintBacchus commented on a diff in pull request #16006: [fix](MTMV) Refine the process of refreshing data

Posted by GitBox <gi...@apache.org>.
SaintBacchus commented on code in PR #16006:
URL: https://github.com/apache/doris/pull/16006#discussion_r1071936581


##########
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java:
##########
@@ -53,20 +57,31 @@ public void executeTask(MTMVTaskExecutor taskExecutor) {
                     isSuccess = taskExecutor.executeTask();
                     if (isSuccess) {
                         task.setState(TaskState.SUCCESS);
-                    } else {
-                        task.setState(TaskState.FAILED);
+                        break;
                     }
-                } catch (Exception ex) {
-                    LOG.warn("failed to execute task.", ex);
-                } finally {
-                    task.setFinishTime(MTMVUtils.getNowTimeStamp());
+                } catch (Throwable t) {
+                    LOG.warn("Failed to execute the task, taskId=" + task.getTaskId() + ".", t);
                 }
                 retryTimes--;
-            } while (!isSuccess && retryTimes >= 0);
+
+                if (retryTimes > 0) {
+                    try {
+                        Thread.sleep(RETRY_INTERVAL);
+                    } catch (InterruptedException e) {
+                        LOG.warn("Failed to sleep.", e);
+                        break;
+                    }
+                }
+            } while (!isSuccess && retryTimes > 0);
             if (!isSuccess) {
-                task.setState(TaskState.FAILED);
+                task.setState(TaskState.FAILURE);
                 task.setErrorCode(-1);
             }
+            task.setFinishTime(MTMVUtils.getNowTimeStamp());
+
+            ChangeMTMVTask changeTask = new ChangeMTMVTask(taskExecutor.getJob().getId(), task, TaskState.RUNNING,
+                    task.getState());
+            Env.getCurrentEnv().getEditLog().logAlterScheduleTask(changeTask);

Review Comment:
   add a try-catch here, othewise it may lose the error in thread



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] SaintBacchus commented on a diff in pull request #16006: [fix](MTMV) Refine the process of refreshing data

Posted by GitBox <gi...@apache.org>.
SaintBacchus commented on code in PR #16006:
URL: https://github.com/apache/doris/pull/16006#discussion_r1071839021


##########
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskProcessor.java:
##########
@@ -17,241 +17,139 @@
 
 package org.apache.doris.mtmv;
 
-import org.apache.doris.analysis.SqlParser;
-import org.apache.doris.analysis.SqlScanner;
-import org.apache.doris.analysis.StatementBase;
-import org.apache.doris.analysis.UserIdentity;
-import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MaterializedView;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.cluster.ClusterNamespace;
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.util.SqlParserUtils;
-import org.apache.doris.datasource.InternalCatalog;
-import org.apache.doris.mtmv.MTMVUtils.TaskState;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.QueryState.MysqlStateType;
 import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.system.SystemInfoService;
 
 import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.io.StringReader;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 
 public class MTMVTaskProcessor {
     private static final Logger LOG = LogManager.getLogger(MTMVTaskProcessor.class);
     private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0);
-    private ConnectContext context;
 
-    void process(MTMVTaskContext context) throws Exception {
+    boolean process(MTMVTaskContext context) throws Exception {
         String taskId = context.getTask().getTaskId();
         long jobId = context.getJob().getId();
-        LOG.info("run mtmv logic start, task_id:{}, jobid:{}", taskId, jobId);
-        String tableName = context.getTask().getMvName();
-        String tmpTableName = genTmpTableName(tableName);
-        DatabaseIf db = Env.getCurrentEnv().getCatalogMgr().getCatalog(InternalCatalog.INTERNAL_CATALOG_NAME)
-                .getDbOrAnalysisException(context.getTask().getDbName());
-        MaterializedView table = (MaterializedView) db.getTableOrAnalysisException(tableName);
-        if (!table.tryMvTaskLock()) {
-            LOG.warn("run mtmv task  failed, taskid:{}, jobid:{}, msg:{}", taskId, jobId, "get lock fail");
-            return;
+        LOG.info("Start to run a MTMV task, taskId={}, jobId={}.", taskId, jobId);
+
+        String mvName = context.getTask().getMVName();
+        String temporaryMVName = getTemporaryMVName(mvName);
+        Database db = context.getCtx().getEnv().getInternalCatalog()
+                .getDbOrMetaException(context.getTask().getDBName());
+        MaterializedView mv = (MaterializedView) db.getTableOrAnalysisException(mvName);
+
+        if (!mv.tryLockMVTask()) {
+            LOG.warn("Failed to run the MTMV task, taskId={}, jobId={}, msg={}.", taskId, jobId,
+                    "Failed to get the lock");
+            context.getTask().setMessage("Failed to get the lock.");
+            return false;
         }
         try {
-            //step1 create tmp table
-            String tmpCreateTableStmt = genCreateTempMaterializedViewStmt(context, tableName, tmpTableName);
-            //check whther tmp table exists, if exists means run mtmv task failed before, so need to drop it first
-            if (db.isTableExist(tmpTableName)) {
-                String dropStml = genDropStml(context, tmpTableName);
-                ConnectContext dropResult = execSQL(context, dropStml);
-                LOG.info("exec drop table stmt, taskid:{}, stmt:{}, ret:{}, msg:{}", taskId, dropStml,
-                        dropResult.getState(), dropResult.getState().getInfoMessage());
-            }
-            ConnectContext createTempTableResult = execSQL(context, tmpCreateTableStmt);
-            LOG.info("exec tmp table stmt, taskid:{}, stmt:{}, ret:{}, msg:{}", taskId, tmpCreateTableStmt,
-                    createTempTableResult.getState(), createTempTableResult.getState().getInfoMessage());
-            if (createTempTableResult.getState().getStateType() != QueryState.MysqlStateType.OK) {
-                throw new Throwable("create tmp table failed, sql:" + tmpCreateTableStmt);
+            // Check whether the temporary materialized view exists, we should drop the obsolete materialized view first
+            // because it was created by previous tasks which failed to complete their work.
+            dropMaterializedView(context, temporaryMVName);
+
+            // Step 1: create the temporary materialized view.
+            String createStatement = generateCreateStatement(mv.clone(temporaryMVName));
+            if (!executeSQL(context, createStatement)) {
+                throw new RuntimeException(
+                        "Failed to create the temporary materialized view, sql=" + createStatement + ".");
             }
 
-            //step2 insert data to tmp table
-            String insertStmt = genInsertIntoStmt(context, tmpTableName);
-            ConnectContext insertDataResult = execSQL(context, insertStmt);
-            LOG.info("exec insert into stmt, taskid:{}, stmt:{}, ret:{}, msg:{}, effected_row:{}", taskId, insertStmt,
-                    insertDataResult.getState(), insertDataResult.getState().getInfoMessage(),
-                    insertDataResult.getState().getAffectedRows());
-            if (insertDataResult.getState().getStateType() != QueryState.MysqlStateType.OK) {
-                throw new Throwable("insert data failed, sql:" + insertStmt);
+            // Step 2: insert data to the temporary materialized view.
+            String insertSelectStatement = generateInsertSelectStmt(context, temporaryMVName);
+            if (!executeSQL(context, insertSelectStatement)) {
+                throw new RuntimeException(
+                        "Failed to insert data to the temporary materialized view, sql=" + insertSelectStatement + ".");
             }
-
-            //step3 swap tmp table with origin table
-            String swapStmt = genSwapStmt(context, tableName, tmpTableName);
-            ConnectContext swapResult = execSQL(context, swapStmt);
-            LOG.info("exec swap stmt, taskid:{}, stmt:{}, ret:{}, msg:{}", taskId, swapStmt, swapResult.getState(),
-                    swapResult.getState().getInfoMessage());
-            if (swapResult.getState().getStateType() != QueryState.MysqlStateType.OK) {
-                throw new Throwable("swap table failed, sql:" + swapStmt);
+            String insertInfoMessage = context.getCtx().getState().getInfoMessage();
+
+            // Step 3: swap the temporary materialized view with the original materialized view.
+            String swapStatement = generateSwapStatement(mvName, temporaryMVName);
+            if (!executeSQL(context, swapStatement)) {
+                throw new RuntimeException(
+                        "Failed to swap the temporary materialized view with the original materialized view, sql="
+                                + swapStatement + ".");
             }
-            //step4 update task info
-            context.getTask().setMessage(insertDataResult.getState().getInfoMessage());
-            context.getTask().setState(TaskState.SUCCESS);
-            LOG.info("run mtmv task success, task_id:{},jobid:{}", taskId, jobId);
-        } catch (AnalysisException e) {
-            LOG.warn("run mtmv task failed, taskid:{}, jobid:{}, msg:{}", taskId, jobId, e.getMessage());
-            context.getTask().setMessage("run task failed, caused by " + e.getMessage());
-            context.getTask().setState(TaskState.FAILED);
+
+            context.getTask().setMessage(insertInfoMessage);
+            LOG.info("Run MTMV task successfully, taskId={}, jobId={}.", taskId, jobId);
+            return true;
         } catch (Throwable e) {
-            LOG.warn("run mtmv task failed, taskid:{}, jobid:{}, msg:{}", taskId, jobId, e.getMessage());
-            context.getTask().setMessage("run task failed, caused by " + e.getMessage());
-            context.getTask().setState(TaskState.FAILED);
+            context.getTask().setMessage(e.getMessage());

Review Comment:
   maybe `setMessage` outside, because in the `MTMVTaskExecutorPool.java` you can still get the runtime exception message



##########
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java:
##########
@@ -53,20 +55,22 @@ public void executeTask(MTMVTaskExecutor taskExecutor) {
                     isSuccess = taskExecutor.executeTask();
                     if (isSuccess) {
                         task.setState(TaskState.SUCCESS);
-                    } else {
-                        task.setState(TaskState.FAILED);
+                        break;
                     }
-                } catch (Exception ex) {
-                    LOG.warn("failed to execute task.", ex);
-                } finally {
-                    task.setFinishTime(MTMVUtils.getNowTimeStamp());
+                } catch (Throwable t) {
+                    LOG.warn("Failed to execute the task, taskId=" + task.getTaskId() + ".", t);
                 }
                 retryTimes--;

Review Comment:
   maybe add a sleep here to avoid fast retry?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] hello-stephen commented on pull request #16006: [fix](MTMV) Refine the process of refreshing data

Posted by GitBox <gi...@apache.org>.
hello-stephen commented on PR #16006:
URL: https://github.com/apache/doris/pull/16006#issuecomment-1385206131

   TeamCity pipeline, clickbench performance test result:
    the sum of best hot time: 35.12 seconds
    load time: 501 seconds
    storage size: 17122526285 Bytes
    https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/tmp/20230117104413_clickbench_pr_82296.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] adonis0147 commented on a diff in pull request #16006: [fix](MTMV) Refine the process of refreshing data

Posted by GitBox <gi...@apache.org>.
adonis0147 commented on code in PR #16006:
URL: https://github.com/apache/doris/pull/16006#discussion_r1071908970


##########
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskProcessor.java:
##########
@@ -17,241 +17,139 @@
 
 package org.apache.doris.mtmv;
 
-import org.apache.doris.analysis.SqlParser;
-import org.apache.doris.analysis.SqlScanner;
-import org.apache.doris.analysis.StatementBase;
-import org.apache.doris.analysis.UserIdentity;
-import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MaterializedView;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.cluster.ClusterNamespace;
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.util.SqlParserUtils;
-import org.apache.doris.datasource.InternalCatalog;
-import org.apache.doris.mtmv.MTMVUtils.TaskState;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.QueryState.MysqlStateType;
 import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.system.SystemInfoService;
 
 import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.io.StringReader;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 
 public class MTMVTaskProcessor {
     private static final Logger LOG = LogManager.getLogger(MTMVTaskProcessor.class);
     private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0);
-    private ConnectContext context;
 
-    void process(MTMVTaskContext context) throws Exception {
+    boolean process(MTMVTaskContext context) throws Exception {
         String taskId = context.getTask().getTaskId();
         long jobId = context.getJob().getId();
-        LOG.info("run mtmv logic start, task_id:{}, jobid:{}", taskId, jobId);
-        String tableName = context.getTask().getMvName();
-        String tmpTableName = genTmpTableName(tableName);
-        DatabaseIf db = Env.getCurrentEnv().getCatalogMgr().getCatalog(InternalCatalog.INTERNAL_CATALOG_NAME)
-                .getDbOrAnalysisException(context.getTask().getDbName());
-        MaterializedView table = (MaterializedView) db.getTableOrAnalysisException(tableName);
-        if (!table.tryMvTaskLock()) {
-            LOG.warn("run mtmv task  failed, taskid:{}, jobid:{}, msg:{}", taskId, jobId, "get lock fail");
-            return;
+        LOG.info("Start to run a MTMV task, taskId={}, jobId={}.", taskId, jobId);
+
+        String mvName = context.getTask().getMVName();
+        String temporaryMVName = getTemporaryMVName(mvName);
+        Database db = context.getCtx().getEnv().getInternalCatalog()
+                .getDbOrMetaException(context.getTask().getDBName());
+        MaterializedView mv = (MaterializedView) db.getTableOrAnalysisException(mvName);
+
+        if (!mv.tryLockMVTask()) {
+            LOG.warn("Failed to run the MTMV task, taskId={}, jobId={}, msg={}.", taskId, jobId,
+                    "Failed to get the lock");
+            context.getTask().setMessage("Failed to get the lock.");
+            return false;
         }
         try {
-            //step1 create tmp table
-            String tmpCreateTableStmt = genCreateTempMaterializedViewStmt(context, tableName, tmpTableName);
-            //check whther tmp table exists, if exists means run mtmv task failed before, so need to drop it first
-            if (db.isTableExist(tmpTableName)) {
-                String dropStml = genDropStml(context, tmpTableName);
-                ConnectContext dropResult = execSQL(context, dropStml);
-                LOG.info("exec drop table stmt, taskid:{}, stmt:{}, ret:{}, msg:{}", taskId, dropStml,
-                        dropResult.getState(), dropResult.getState().getInfoMessage());
-            }
-            ConnectContext createTempTableResult = execSQL(context, tmpCreateTableStmt);
-            LOG.info("exec tmp table stmt, taskid:{}, stmt:{}, ret:{}, msg:{}", taskId, tmpCreateTableStmt,
-                    createTempTableResult.getState(), createTempTableResult.getState().getInfoMessage());
-            if (createTempTableResult.getState().getStateType() != QueryState.MysqlStateType.OK) {
-                throw new Throwable("create tmp table failed, sql:" + tmpCreateTableStmt);
+            // Check whether the temporary materialized view exists, we should drop the obsolete materialized view first
+            // because it was created by previous tasks which failed to complete their work.
+            dropMaterializedView(context, temporaryMVName);
+
+            // Step 1: create the temporary materialized view.
+            String createStatement = generateCreateStatement(mv.clone(temporaryMVName));
+            if (!executeSQL(context, createStatement)) {
+                throw new RuntimeException(
+                        "Failed to create the temporary materialized view, sql=" + createStatement + ".");
             }
 
-            //step2 insert data to tmp table
-            String insertStmt = genInsertIntoStmt(context, tmpTableName);
-            ConnectContext insertDataResult = execSQL(context, insertStmt);
-            LOG.info("exec insert into stmt, taskid:{}, stmt:{}, ret:{}, msg:{}, effected_row:{}", taskId, insertStmt,
-                    insertDataResult.getState(), insertDataResult.getState().getInfoMessage(),
-                    insertDataResult.getState().getAffectedRows());
-            if (insertDataResult.getState().getStateType() != QueryState.MysqlStateType.OK) {
-                throw new Throwable("insert data failed, sql:" + insertStmt);
+            // Step 2: insert data to the temporary materialized view.
+            String insertSelectStatement = generateInsertSelectStmt(context, temporaryMVName);
+            if (!executeSQL(context, insertSelectStatement)) {
+                throw new RuntimeException(
+                        "Failed to insert data to the temporary materialized view, sql=" + insertSelectStatement + ".");
             }
-
-            //step3 swap tmp table with origin table
-            String swapStmt = genSwapStmt(context, tableName, tmpTableName);
-            ConnectContext swapResult = execSQL(context, swapStmt);
-            LOG.info("exec swap stmt, taskid:{}, stmt:{}, ret:{}, msg:{}", taskId, swapStmt, swapResult.getState(),
-                    swapResult.getState().getInfoMessage());
-            if (swapResult.getState().getStateType() != QueryState.MysqlStateType.OK) {
-                throw new Throwable("swap table failed, sql:" + swapStmt);
+            String insertInfoMessage = context.getCtx().getState().getInfoMessage();
+
+            // Step 3: swap the temporary materialized view with the original materialized view.
+            String swapStatement = generateSwapStatement(mvName, temporaryMVName);
+            if (!executeSQL(context, swapStatement)) {
+                throw new RuntimeException(
+                        "Failed to swap the temporary materialized view with the original materialized view, sql="
+                                + swapStatement + ".");
             }
-            //step4 update task info
-            context.getTask().setMessage(insertDataResult.getState().getInfoMessage());
-            context.getTask().setState(TaskState.SUCCESS);
-            LOG.info("run mtmv task success, task_id:{},jobid:{}", taskId, jobId);
-        } catch (AnalysisException e) {
-            LOG.warn("run mtmv task failed, taskid:{}, jobid:{}, msg:{}", taskId, jobId, e.getMessage());
-            context.getTask().setMessage("run task failed, caused by " + e.getMessage());
-            context.getTask().setState(TaskState.FAILED);
+
+            context.getTask().setMessage(insertInfoMessage);
+            LOG.info("Run MTMV task successfully, taskId={}, jobId={}.", taskId, jobId);
+            return true;
         } catch (Throwable e) {
-            LOG.warn("run mtmv task failed, taskid:{}, jobid:{}, msg:{}", taskId, jobId, e.getMessage());
-            context.getTask().setMessage("run task failed, caused by " + e.getMessage());
-            context.getTask().setState(TaskState.FAILED);
+            context.getTask().setMessage(e.getMessage());

Review Comment:
   I think it is ok. If tasks failed before, we can check the message while they are running.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] adonis0147 commented on a diff in pull request #16006: [fix](MTMV) Refine the process of refreshing data

Posted by GitBox <gi...@apache.org>.
adonis0147 commented on code in PR #16006:
URL: https://github.com/apache/doris/pull/16006#discussion_r1071921323


##########
fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java:
##########
@@ -137,29 +137,17 @@ public void processDropMaterializedView(DropMaterializedViewStmt stmt) throws Dd
         if (!stmt.isForMTMV() && stmt.getTableName() == null) {
             throw new DdlException("Drop materialized view without table name is unsupported : " + stmt.toSql());
         }
-        TableName tableName = !stmt.isForMTMV() ? stmt.getTableName() : stmt.getMTMVName();
-        Database db;
-        OlapTable olapTable;
-        if (stmt.isIfExists()) {
-            try {
-                String dbName = tableName.getDb();
-                db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
-                String name = tableName.getTbl();
-                olapTable = (OlapTable) db.getTableOrMetaException(name,
-                        !stmt.isForMTMV() ? TableType.OLAP : TableType.MATERIALIZED_VIEW);
-            } catch (Exception e) {
-                LOG.info("db or table not exists, msg={}", e.getMessage());

Review Comment:
   > DROP MATERIALIZED VIEW if exists mv;
   > 
   > if mv not exist can above sql return ok ?
   
   Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org