You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/12/12 09:56:14 UTC

[GitHub] [doris] adonis0147 commented on a diff in pull request #14944: [feature-wip](MTMV)impl import data to multi table materialized view table

adonis0147 commented on code in PR #14944:
URL: https://github.com/apache/doris/pull/14944#discussion_r1045573358


##########
fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java:
##########
@@ -79,4 +79,5 @@ public class FeConstants {
     public static String FS_PREFIX_HDFS = "hdfs";
     public static String FS_PREFIX_FILE = "file";
     public static final String INTERNAL_DB_NAME = "__internal_schema";
+    public static String TEMP_MATERIZLIZE_DVIEW_PREFIX = "doris_internal_tmp_multi_view_";

Review Comment:
   I don't think the name `multi view` is suitable, because we just create a one temp view for each job.



##########
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java:
##########
@@ -23,21 +23,31 @@
 import org.apache.doris.analysis.MVRefreshIntervalTriggerInfo;
 import org.apache.doris.analysis.MVRefreshTriggerInfo;
 import org.apache.doris.catalog.MaterializedView;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.mtmv.MTMVUtils.TriggerMode;
 import org.apache.doris.mtmv.metadata.MTMVJob;
 import org.apache.doris.mtmv.metadata.MTMVJob.JobSchedule;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
 public class MTMVJobFactory {
+    private static final Logger LOG = LogManager.getLogger(MTMVTaskProcessor.class);
+
     public static boolean isGenerateJob(MaterializedView materializedView) {
-        boolean completeRefresh =  materializedView.getRefreshInfo().getRefreshMethod() == RefreshMethod.COMPLETE;
+        boolean completeRefresh = materializedView.getRefreshInfo().getRefreshMethod() == RefreshMethod.COMPLETE;
         BuildMode buildMode = materializedView.getBuildMode();
-        MVRefreshTriggerInfo triggerInfo =  materializedView.getRefreshInfo().getTriggerInfo();
+        MVRefreshTriggerInfo triggerInfo = materializedView.getRefreshInfo().getTriggerInfo();
+        //cannnot generate job when create temp view table

Review Comment:
   ```suggestion
           //can not generate a job when creating a temp materialized view.
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskProcessor.java:
##########
@@ -17,13 +17,249 @@
 
 package org.apache.doris.mtmv;
 
+import org.apache.doris.analysis.SqlParser;
+import org.apache.doris.analysis.SqlScanner;
+import org.apache.doris.analysis.StatementBase;
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedView;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.SqlParserUtils;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.mtmv.MTMVUtils.TaskState;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+
 public class MTMVTaskProcessor {
     private static final Logger LOG = LogManager.getLogger(MTMVTaskProcessor.class);
+    private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0);
+    private ConnectContext context;
 
     void process(MTMVTaskContext context) throws Exception {
-        LOG.info("run mv logic here.");
+        String taskId = context.getTask().getTaskId();
+        long jobId = context.getJob().getId();
+        LOG.info("run mv logic start, task_id:{}, jobid:{}", taskId, jobId);

Review Comment:
   I don't think the log message `run mv logic start` is suitable.



##########
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskProcessor.java:
##########
@@ -17,13 +17,249 @@
 
 package org.apache.doris.mtmv;
 
+import org.apache.doris.analysis.SqlParser;
+import org.apache.doris.analysis.SqlScanner;
+import org.apache.doris.analysis.StatementBase;
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedView;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.SqlParserUtils;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.mtmv.MTMVUtils.TaskState;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+
 public class MTMVTaskProcessor {
     private static final Logger LOG = LogManager.getLogger(MTMVTaskProcessor.class);
+    private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0);
+    private ConnectContext context;
 
     void process(MTMVTaskContext context) throws Exception {
-        LOG.info("run mv logic here.");
+        String taskId = context.getTask().getTaskId();
+        long jobId = context.getJob().getId();
+        LOG.info("run mv logic start, task_id:{}, jobid:{}", taskId, jobId);
+        String tableName = context.getTask().getMvName();
+        String tmpTableName = genTmpTableName(tableName);
+        DatabaseIf db = Env.getCurrentEnv().getCatalogMgr().getCatalog(InternalCatalog.INTERNAL_CATALOG_NAME)
+                .getDbOrAnalysisException(context.getTask().getDbName());
+        MaterializedView table = (MaterializedView) db.getTableOrAnalysisException(tableName);
+        if (!table.tryMvTaskLock()) {
+            LOG.warn("run mv task  failed, taskid:{}, jobid:{}, msg:{}", taskId, jobId, "get lock fail");

Review Comment:
   ```suggestion
               LOG.warn("run MTMV task failed, taskid:{}, jobid:{}, msg:{}", taskId, jobId, "get lock fail");
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java:
##########
@@ -120,6 +120,16 @@ default T getTableOrMetaException(String tableName, TableIf.TableType tableType)
         return table;
     }
 
+    default T getTableViewOrMetaException(String tableName) throws MetaNotFoundException {
+        T table = getTableOrMetaException(tableName);
+        if (table.getType() != TableIf.TableType.OLAP && table.getType() != TableIf.TableType.MATERIALIZED_VIEW) {
+            throw new MetaNotFoundException(
+                    "table type is not olap or materialized view, tableName=" + tableName + ", type="
+                            + table.getType());
+        }
+        return table;
+    }
+

Review Comment:
   I think the following interface is better due to it is more generic.
   ```suggestion
       default T getTableOrMetaException(String tableName, List<TableIf.TableType> tableTypes)
               throws MetaNotFoundException {
           T table = getTableOrMetaException(tableName);
           if (!tableTypes.contains(table.getType())) {
               throw new MetaNotFoundException(
                       "Tye type of " + tableName + " doesn't match, expected data tables=" + tableTypes);
           }
           return table;
       }
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java:
##########
@@ -68,7 +68,9 @@ public void executeTask(MTMVTaskExecutor taskExecutor) {
             if (!isSuccess) {
                 task.setState(TaskState.FAILED);
                 task.setErrorCode(-1);
-                task.setMessage(lastExceptionString);
+                if (lastExceptionString.length() > 0) {

Review Comment:
   Does this branch necessary?



##########
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskProcessor.java:
##########
@@ -17,13 +17,249 @@
 
 package org.apache.doris.mtmv;
 
+import org.apache.doris.analysis.SqlParser;
+import org.apache.doris.analysis.SqlScanner;
+import org.apache.doris.analysis.StatementBase;
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedView;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.SqlParserUtils;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.mtmv.MTMVUtils.TaskState;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+
 public class MTMVTaskProcessor {
     private static final Logger LOG = LogManager.getLogger(MTMVTaskProcessor.class);
+    private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0);
+    private ConnectContext context;
 
     void process(MTMVTaskContext context) throws Exception {
-        LOG.info("run mv logic here.");
+        String taskId = context.getTask().getTaskId();
+        long jobId = context.getJob().getId();
+        LOG.info("run mv logic start, task_id:{}, jobid:{}", taskId, jobId);
+        String tableName = context.getTask().getMvName();
+        String tmpTableName = genTmpTableName(tableName);
+        DatabaseIf db = Env.getCurrentEnv().getCatalogMgr().getCatalog(InternalCatalog.INTERNAL_CATALOG_NAME)
+                .getDbOrAnalysisException(context.getTask().getDbName());
+        MaterializedView table = (MaterializedView) db.getTableOrAnalysisException(tableName);
+        if (!table.tryMvTaskLock()) {
+            LOG.warn("run mv task  failed, taskid:{}, jobid:{}, msg:{}", taskId, jobId, "get lock fail");
+            return;
+        }
+        try {
+            //step1 create tmp table
+            String tmpCreateTableStmt = genCreateTempMaterializedViewStmt(context, tableName, tmpTableName);
+            //check whther tmp table exists, if exists means run mv task failed before, so need to drop it first
+            if (db.isTableExist(tmpTableName)) {
+                String dropStml = genDropStml(context, tmpTableName);
+                ConnectContext dropResult = execSQL(context, dropStml);
+                LOG.info("exec drop table stmt, taskid:{}, stmt:{}, ret:{}, msg:{}", taskId, dropStml,
+                        dropResult.getState(), dropResult.getState().getInfoMessage());
+            }
+            ConnectContext createTempTableResult = execSQL(context, tmpCreateTableStmt);
+            LOG.info("exec tmp table stmt, taskid:{}, stmt:{}, ret:{}, msg:{}", taskId, tmpCreateTableStmt,
+                    createTempTableResult.getState(), createTempTableResult.getState().getInfoMessage());
+            if (createTempTableResult.getState().getStateType() != QueryState.MysqlStateType.OK) {
+                throw new Throwable("create tmp table failed, sql:" + tmpCreateTableStmt);
+            }
+
+            //step2 insert data to tmp table
+            String insertStmt = genInsertIntoStmt(context, tmpTableName);
+            ConnectContext insertDataResult = execSQL(context, insertStmt);
+            LOG.info("exec insert into stmt, taskid:{}, stmt:{}, ret:{}, msg:{}, effected_row:{}", taskId, insertStmt,
+                    insertDataResult.getState(), insertDataResult.getState().getInfoMessage(),
+                    insertDataResult.getState().getAffectedRows());
+            if (insertDataResult.getState().getStateType() != QueryState.MysqlStateType.OK) {
+                throw new Throwable("insert data failed, sql:" + insertStmt);
+            }
+
+            //step3 swap tmp table with origin table
+            String swapStmt = genSwapStmt(context, tableName, tmpTableName);
+            ConnectContext swapResult = execSQL(context, swapStmt);
+            LOG.info("exec swap stmt, taskid:{}, stmt:{}, ret:{}, msg:{}", taskId, swapStmt, swapResult.getState(),
+                    swapResult.getState().getInfoMessage());
+            if (swapResult.getState().getStateType() != QueryState.MysqlStateType.OK) {
+                throw new Throwable("swap table failed, sql:" + swapStmt);
+            }
+            //step4 update task info
+            context.getTask().setMessage(insertDataResult.getState().getInfoMessage());
+            context.getTask().setState(TaskState.SUCCESS);
+            LOG.info("run mv task success, task_id:{},jobid:{}", taskId, jobId);
+        } catch (AnalysisException e) {
+            LOG.warn("run mv task failed, taskid:{}, jobid:{}, msg:{}", taskId, jobId, e.getMessage());
+            context.getTask().setMessage("run task failed, caused by " + e.getMessage());
+            context.getTask().setState(TaskState.FAILED);
+        } catch (Throwable e) {
+            LOG.warn("run mv task failed, taskid:{}, jobid:{}, msg:{}", taskId, jobId, e.getMessage());
+            context.getTask().setMessage("run task failed, caused by " + e.getMessage());
+            context.getTask().setState(TaskState.FAILED);
+        } finally {
+            context.getTask().setFinishTime(MTMVUtils.getNowTimeStamp());
+            table.mvTaskUnLock();
+            //double check
+            if (db.isTableExist(tmpTableName)) {
+                String dropStml = genDropStml(context, tmpTableName);
+                ConnectContext dropResult = execSQL(context, dropStml);
+                LOG.info("exec drop table stmt, taskid:{}, stmt:{}, ret:{}, msg:{}", taskId, dropStml,
+                        dropResult.getState(), dropResult.getState().getInfoMessage());
+            }
+        }
+    }
+
+    private String genDropStml(MTMVTaskContext context, String tableName) {
+        String stmt = "DROP MATERIALIZED VIEW  if exists " + tableName;
+        LOG.info("gen drop stmt, taskid:{}, stmt:{}", context.getTask().getTaskId(), stmt);
+        return stmt;
+    }
+
+    private String genTmpTableName(String tableName) {
+        String tmpTableName = FeConstants.TEMP_MATERIZLIZE_DVIEW_PREFIX + tableName;
+        return tmpTableName;
+    }
+
+    // ALTER TABLE t1 REPLACE WITH TABLE t1_mirror PROPERTIES('swap' = 'false');
+    private String genSwapStmt(MTMVTaskContext context, String tableName, String tmpTableName) {
+        String stmt = "ALTER TABLE " + tableName + " REPLACE WITH TABLE " + tmpTableName
+                + " PROPERTIES('swap' = 'false');";
+        LOG.info("gen swap stmt, taskid:{}, stmt:{}", context.getTask().getTaskId(), stmt);
+        return stmt;
+    }
+
+    private String genInsertIntoStmt(MTMVTaskContext context, String tmpTableName) {
+        String query = context.getQuery();
+        String stmt = "insert into " + tmpTableName + " " + query;
+        stmt = stmt.replaceAll(SystemInfoService.DEFAULT_CLUSTER + ":", "");
+        LOG.info("gen insert into stmt, taskid:{}, stmt:{}", context.getTask().getTaskId(), stmt);
+        return stmt;
+    }
+
+    private String genCreateTempMaterializedViewStmt(MTMVTaskContext context, String tableName, String tmpTableName) {
+        try {
+            String dbName = context.getTask().getDbName();
+            String originViewStmt = getCreateViewStmt(dbName, tableName);
+            String tmpViewStmt = convertCreateViewStmt(originViewStmt, tmpTableName);
+            LOG.info("gen tmp table stmt, taskid:{}, originstml:{},  stmt:{}", context.getTask().getTaskId(),
+                    originViewStmt.replaceAll("\n", " "), tmpViewStmt);
+            return tmpViewStmt;
+        } catch (Throwable e) {
+            LOG.warn("fail to gen tmp table stmt, taskid:{}, msg:{}", context.getTask().getTaskId(), e.getMessage());
+            return "";
+        }
+    }
+
+    //Generate temporary view table statement
+    private String convertCreateViewStmt(String stmt, String tmpTable) {
+        stmt = stmt.replace("`", "");
+        String regex = "CREATE MATERIALIZED VIEW.*\n";
+        String replacement = "CREATE MATERIALIZED VIEW " + tmpTable + "\n";
+        stmt = stmt.replaceAll(regex, replacement);
+        // regex = "BUILD.*\n";
+        // stmt = stmt.replaceAll(regex, " BUILD deferred never REFRESH \n");
+        stmt = stmt.replaceAll("\n", " ");
+        stmt = stmt.replaceAll(SystemInfoService.DEFAULT_CLUSTER + ":", "");
+        return stmt;
+    }
+
+    // get origin table create stmt from env
+    private String getCreateViewStmt(String dbName, String tableName) throws AnalysisException {
+        ConnectContext ctx = new ConnectContext();
+        ctx.setEnv(Env.getCurrentEnv());
+        DatabaseIf db = ctx.getEnv().getCatalogMgr().getCatalog(InternalCatalog.INTERNAL_CATALOG_NAME)
+                .getDbOrAnalysisException(dbName);
+        TableIf table = db.getTableOrAnalysisException(tableName);
+        table.readLock();
+        try {
+            List<String> createTableStmt = Lists.newArrayList();
+            Env.getDdlStmt(table, createTableStmt, null, null, false, true /* hide password */, -1L);
+            if (createTableStmt.isEmpty()) {
+                return "";
+            }
+            return createTableStmt.get(0);
+        } catch (Throwable e) {
+            //throw new AnalysisException(e.getMessage());
+        } finally {
+            table.readUnlock();
+        }
+        return "";
+    }
+
+    private ConnectContext execSQL(MTMVTaskContext context, String originStmt) throws AnalysisException, DdlException {
+        ConnectContext ctx = new ConnectContext();
+        ctx.setEnv(Env.getCurrentEnv());
+        ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER);
+        ctx.setThreadLocalInfo();
+        String fullDbName = ClusterNamespace
+                .getFullName(SystemInfoService.DEFAULT_CLUSTER, context.getTask().getDbName());
+        ctx.setDatabase(fullDbName);
+        ctx.setQualifiedUser("root");
+        ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp("root", "%"));
+        ctx.getState().reset();
+
+        List<StatementBase> stmts = null;
+        StatementBase parsedStmt = null;
+        stmts = parse(ctx, originStmt);
+        parsedStmt = stmts.get(0);
+        try {
+            StmtExecutor executor = new StmtExecutor(ctx, parsedStmt);
+            ctx.setExecutor(executor);
+            executor.execute();
+        } catch (IOException e) {
+            LOG.warn("execSQL failed, taskid:{}, msg:{}, stmt:{}", context.getTask().getTaskId(), e.getMessage(),
+                    originStmt);
+        } catch (UserException e) {
+            LOG.warn("execSQL failed, taskid:{}, msg:{}, stmt:{}", context.getTask().getTaskId(), e.getMessage(),
+                    originStmt);
+        } catch (Throwable e) {
+            LOG.warn("execSQL failed, taskid:{}, msg:{}, stmt:{}", context.getTask().getTaskId(), e.getMessage(),
+                    originStmt);

Review Comment:
   We can merge these exception handlings by using `catch (Throwable e)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org