You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@doris.apache.org by GitBox <gi...@apache.org> on 2018/11/15 13:04:31 UTC

[GitHub] morningman closed pull request #313: scheduler routine load job for stream load

morningman closed pull request #313: scheduler routine load job for stream load
URL: https://github.com/apache/incubator-doris/pull/313
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/fe/pom.xml b/fe/pom.xml
index d7227a76..874e4273 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -517,6 +517,12 @@ under the License.
             <version>1.7.5</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>2.0.0</version>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup
index b5a038ff..9bcb843e 100644
--- a/fe/src/main/cup/sql_parser.cup
+++ b/fe/src/main/cup/sql_parser.cup
@@ -957,7 +957,7 @@ help_stmt ::=
 
 // Export statement
 export_stmt ::=
-    // KW_EXPORT KW_TABLE table_name:tblName opt_using_partition:partitions 
+    // KW_EXPORT KW_TABLE table_name:tblName opt_using_partition:partitions
     KW_EXPORT KW_TABLE base_table_ref:tblRef
     KW_TO STRING_LITERAL:path
     opt_properties:properties
diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index 4d445cc6..5b57002d 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -135,6 +135,7 @@
 import org.apache.doris.load.LoadErrorHub;
 import org.apache.doris.load.LoadJob;
 import org.apache.doris.load.LoadJob.JobState;
+import org.apache.doris.load.routineload.RoutineLoad;
 import org.apache.doris.master.Checkpoint;
 import org.apache.doris.master.MetaHelper;
 import org.apache.doris.metric.MetricRepo;
@@ -240,6 +241,7 @@
     private ConcurrentHashMap<String, Cluster> nameToCluster;
 
     private Load load;
+    private RoutineLoad routineLoad;
     private ExportMgr exportMgr;
     private Clone clone;
     private Alter alter;
@@ -311,7 +313,7 @@
 
     private PullLoadJobMgr pullLoadJobMgr;
     private BrokerMgr brokerMgr;
-    
+
     private GlobalTransactionMgr globalTransactionMgr;
 
     private DeployManager deployManager;
@@ -370,6 +372,7 @@ private Catalog() {
         this.idToDb = new ConcurrentHashMap<>();
         this.fullNameToDb = new ConcurrentHashMap<>();
         this.load = new Load();
+        this.routineLoad = new RoutineLoad();
         this.exportMgr = new ExportMgr();
         this.clone = new Clone();
         this.alter = new Alter();
@@ -419,7 +422,7 @@ private Catalog() {
 
         this.auth = new PaloAuth();
         this.domainResolver = new DomainResolver(auth);
-        
+
         this.esStateStore = new EsStateStore();
     }
 
@@ -457,11 +460,11 @@ public PullLoadJobMgr getPullLoadJobMgr() {
     public BrokerMgr getBrokerMgr() {
         return brokerMgr;
     }
-    
+
     public static GlobalTransactionMgr getCurrentGlobalTransactionMgr() {
         return getCurrentCatalog().globalTransactionMgr;
     }
-    
+
     public GlobalTransactionMgr getGlobalTransactionMgr() {
         return globalTransactionMgr;
     }
@@ -572,20 +575,20 @@ public void initialize(String[] args) throws Exception {
         // 5. create es state store
         esStateStore.loadTableFromCatalog();
         esStateStore.start();
-        
+
         // 6. start state listener thread
         createStateListener();
         listener.setName("stateListener");
         listener.setInterval(STATE_CHANGE_CHECK_INTERVAL_MS);
         listener.start();
-        
+
         // 7. start txn cleaner thread
         createTxnCleaner();
         txnCleaner.setName("txnCleaner");
         // the clear threads runs every min(transaction_clean_interval_second,stream_load_default_timeout_second)/10
-        txnCleaner.setInterval(Math.min(Config.transaction_clean_interval_second, 
+        txnCleaner.setInterval(Math.min(Config.transaction_clean_interval_second,
                 Config.stream_load_default_timeout_second) * 100L);
-        
+
     }
 
     private void getClusterIdAndRole() throws IOException {
@@ -615,7 +618,7 @@ private void getClusterIdAndRole() throws IOException {
             if (!roleFile.exists()) {
                 // The very first time to start the first node of the cluster.
                 // It should became a Master node (Master node's role is also FOLLOWER, which means electable)
-                
+
                 // For compatibility. Because this is the very first time to start, so we arbitrarily choose
                 // a new name for this node
                 role = FrontendNodeType.FOLLOWER;
@@ -673,7 +676,7 @@ private void getClusterIdAndRole() throws IOException {
         } else {
             // try to get role and node name from helper node,
             // this loop will not end until we get certain role type and name
-            while(true) {
+            while (true) {
                 if (!getFeNodeTypeAndNameFromHelpers()) {
                     LOG.warn("current node is not added to the group. please add it first. "
                             + "sleep 5 seconds and retry, current helper nodes: {}", helperNodes);
@@ -700,7 +703,7 @@ private void getClusterIdAndRole() throws IOException {
             Pair<String, Integer> rightHelperNode = helperNodes.get(0);
 
             Storage storage = new Storage(IMAGE_DIR);
-            if (roleFile.exists() && (role != storage.getRole() || !nodeName.equals(storage.getNodeName())) 
+            if (roleFile.exists() && (role != storage.getRole() || !nodeName.equals(storage.getNodeName()))
                     || !roleFile.exists()) {
                 storage.writeFrontendRoleAndNodeName(role, nodeName);
             }
@@ -773,7 +776,7 @@ private void getClusterIdAndRole() throws IOException {
 
         Preconditions.checkState(helperNodes.size() == 1);
         LOG.info("finished to get cluster id: {}, role: {} and node name: {}",
-                 clusterId, role.name(), nodeName);
+                clusterId, role.name(), nodeName);
     }
 
     public static String genFeNodeName(String host, int port, boolean isOldStyle) {
@@ -799,7 +802,7 @@ private boolean getFeNodeTypeAndNameFromHelpers() {
                 conn = (HttpURLConnection) url.openConnection();
                 if (conn.getResponseCode() != 200) {
                     LOG.warn("failed to get fe node type from helper node: {}. response code: {}",
-                             helperNode, conn.getResponseCode());
+                            helperNode, conn.getResponseCode());
                     continue;
                 }
 
@@ -1002,10 +1005,10 @@ private void transferToMaster() throws IOException {
         // Clone checker
         CloneChecker.getInstance().setInterval(Config.clone_checker_interval_second * 1000L);
         CloneChecker.getInstance().start();
-        
+
         // Publish Version Daemon
         publishVersionDaemon.start();
-        
+
         // Start txn cleaner
         txnCleaner.start();
 
@@ -1234,7 +1237,7 @@ public void loadImage(String imageDir) throws IOException, DdlException {
         long loadImageEndTime = System.currentTimeMillis();
         LOG.info("finished load image in " + (loadImageEndTime - loadImageStartTime) + " ms");
     }
-    
+
     private void recreateTabletInvertIndex() {
         if (isCheckpointThread()) {
             return;
@@ -1390,7 +1393,7 @@ public long loadLoadJob(DataInputStream dis, long checksum) throws IOException,
             param.readFields(dis);
             load.setLoadErrorHubInfo(param);
         }
-        
+
         if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_45) {
             // 4. load delete jobs
             int deleteJobSize = dis.readInt();
@@ -1398,14 +1401,14 @@ public long loadLoadJob(DataInputStream dis, long checksum) throws IOException,
             for (int i = 0; i < deleteJobSize; i++) {
                 long dbId = dis.readLong();
                 newChecksum ^= dbId;
-    
+
                 int deleteJobCount = dis.readInt();
                 newChecksum ^= deleteJobCount;
                 for (int j = 0; j < deleteJobCount; j++) {
                     LoadJob job = new LoadJob();
                     job.readFields(dis);
                     long currentTimeMs = System.currentTimeMillis();
-    
+
                     // Delete the history load jobs that are older than
                     // LABEL_KEEP_MAX_MS
                     // This job must be FINISHED or CANCELLED
@@ -1478,7 +1481,7 @@ public long loadAlterJob(DataInputStream dis, long checksum, JobType type) throw
             // init job
             Database db = getDb(job.getDbId());
             // should check job state here because the job is finished but not removed from alter jobs list
-            if (db != null && (job.getState() == org.apache.doris.alter.AlterJob.JobState.PENDING 
+            if (db != null && (job.getState() == org.apache.doris.alter.AlterJob.JobState.PENDING
                     || job.getState() == org.apache.doris.alter.AlterJob.JobState.RUNNING)) {
                 job.replayInitJob(db);
             }
@@ -1532,7 +1535,7 @@ public long loadBackupAndRestoreJob_D(DataInputStream dis, long checksum) throws
 
     @Deprecated
     private long loadBackupAndRestoreJob_D(DataInputStream dis, long checksum,
-            Class<? extends AbstractBackupJob_D> jobClass) throws IOException {
+                                           Class<? extends AbstractBackupJob_D> jobClass) throws IOException {
         int size = dis.readInt();
         long newChecksum = checksum ^ size;
         for (int i = 0; i < size; i++) {
@@ -1592,14 +1595,14 @@ public long loadAccessService(DataInputStream dis, long checksum) throws IOExcep
             long newChecksum = checksum ^ size;
             UserPropertyMgr tmpUserPropertyMgr = new UserPropertyMgr();
             tmpUserPropertyMgr.readFields(dis);
-            
+
             // transform it. the old UserPropertyMgr is deprecated
             tmpUserPropertyMgr.transform(auth);
             return newChecksum;
         }
         return checksum;
     }
-    
+
     public long loadTransactionState(DataInputStream dis, long checksum) throws IOException {
         int size = dis.readInt();
         long newChecksum = checksum ^ size;
@@ -1669,7 +1672,7 @@ public void saveImage(File curFile, long replayedJournalId) throws IOException {
 
         long saveImageEndTime = System.currentTimeMillis();
         LOG.info("finished save image {} in {} ms. checksum is {}",
-                 curFile.getAbsolutePath(), (saveImageEndTime - saveImageStartTime), checksum);
+                curFile.getAbsolutePath(), (saveImageEndTime - saveImageStartTime), checksum);
     }
 
     public long saveHeader(DataOutputStream dos, long replayedJournalId, long checksum) throws IOException {
@@ -1785,7 +1788,7 @@ public long saveLoadJob(DataOutputStream dos, long checksum) throws IOException
         // 3. load error hub info
         LoadErrorHub.Param param = load.getLoadErrorHubInfo();
         param.write(dos);
-        
+
         // 4. save delete load job info
         Map<Long, List<LoadJob>> dbToDeleteJobs = load.getDbToDeleteJobs();
         int deleteJobSize = dbToDeleteJobs.size();
@@ -1873,7 +1876,7 @@ public long savePaloAuth(DataOutputStream dos, long checksum) throws IOException
         auth.write(dos);
         return checksum;
     }
-    
+
     public long saveTransactionState(DataOutputStream dos, long checksum) throws IOException {
         int size = globalTransactionMgr.getTransactionNum();
         checksum ^= size;
@@ -1914,7 +1917,7 @@ protected void runOneCycle() {
             }
         };
     }
-    
+
     public void createTxnCleaner() {
         txnCleaner = new Daemon() {
             protected void runOneCycle() {
@@ -2008,85 +2011,85 @@ protected void runOneCycle() {
 
                 if (formerFeType == FrontendNodeType.INIT) {
                     switch (feType) {
-                    case MASTER: {
-                        try {
-                            transferToMaster();
-                        } catch (IOException e) {
-                            e.printStackTrace();
+                        case MASTER: {
+                            try {
+                                transferToMaster();
+                            } catch (IOException e) {
+                                e.printStackTrace();
+                            }
+                            break;
                         }
-                        break;
-                    }
-                    case UNKNOWN:
-                    case FOLLOWER:
-                    case OBSERVER: {
-                        transferToNonMaster();
-                        break;
-                    }
-                    default:
-                        break;
+                        case UNKNOWN:
+                        case FOLLOWER:
+                        case OBSERVER: {
+                            transferToNonMaster();
+                            break;
+                        }
+                        default:
+                            break;
                     }
                     return;
                 }
 
                 if (formerFeType == FrontendNodeType.UNKNOWN) {
                     switch (feType) {
-                    case MASTER: {
-                        try {
-                            transferToMaster();
-                        } catch (IOException e) {
-                            e.printStackTrace();
+                        case MASTER: {
+                            try {
+                                transferToMaster();
+                            } catch (IOException e) {
+                                e.printStackTrace();
+                            }
+                            break;
                         }
-                        break;
-                    }
-                    case FOLLOWER:
-                    case OBSERVER: {
-                        transferToNonMaster();
-                        break;
-                    }
-                    default:
+                        case FOLLOWER:
+                        case OBSERVER: {
+                            transferToNonMaster();
+                            break;
+                        }
+                        default:
                     }
                     return;
                 }
 
                 if (formerFeType == FrontendNodeType.FOLLOWER) {
                     switch (feType) {
-                    case MASTER: {
-                        try {
-                            transferToMaster();
-                        } catch (IOException e) {
-                            e.printStackTrace();
+                        case MASTER: {
+                            try {
+                                transferToMaster();
+                            } catch (IOException e) {
+                                e.printStackTrace();
+                            }
+                            break;
                         }
-                        break;
-                    }
-                    case UNKNOWN:
-                    case OBSERVER: {
-                        transferToNonMaster();
-                        break;
-                    }
-                    default:
+                        case UNKNOWN:
+                        case OBSERVER: {
+                            transferToNonMaster();
+                            break;
+                        }
+                        default:
                     }
                     return;
                 }
 
                 if (formerFeType == FrontendNodeType.OBSERVER) {
                     switch (feType) {
-                    case UNKNOWN: {
-                        transferToNonMaster();
+                        case UNKNOWN: {
+                            transferToNonMaster();
                             break;
-                    }
-                    default:
+                        }
+                        default:
                     }
                     return;
                 }
 
                 if (formerFeType == FrontendNodeType.MASTER) {
                     switch (feType) {
-                    case UNKNOWN:
-                    case FOLLOWER:
-                    case OBSERVER: {
-                        System.exit(-1);
-                    }
-                    default:
+                        case UNKNOWN:
+                        case FOLLOWER:
+                        case OBSERVER: {
+                            System.exit(-1);
+                        }
+                        default:
                     }
                     return;
                 }
@@ -2650,7 +2653,7 @@ public void addPartition(Database db, String tableName, AddPartitionClause addPa
     }
 
     public Pair<Long, Partition> addPartition(Database db, String tableName, OlapTable givenTable,
-            AddPartitionClause addPartitionClause, boolean isRestore) throws DdlException {
+                                              AddPartitionClause addPartitionClause, boolean isRestore) throws DdlException {
         SingleRangePartitionDesc singlePartitionDesc = addPartitionClause.getSingeRangePartitionDesc();
         DistributionDesc distributionDesc = addPartitionClause.getDistributionDesc();
 
@@ -2768,18 +2771,18 @@ public void addPartition(Database db, String tableName, AddPartitionClause addPa
         try {
             long partitionId = getNextId();
             Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(),
-                                                             olapTable.getId(),
-                                                             partitionId, partitionName,
-                                                             indexIdToShortKeyColumnCount,
-                                                             indexIdToSchemaHash,
-                                                             indexIdToStorageType,
-                                                             indexIdToSchema,
-                                                             olapTable.getKeysType(),
-                                                             distributionInfo,
-                                                             dataProperty.getStorageMedium(),
-                                                             singlePartitionDesc.getReplicationNum(),
-                                                             versionInfo, bfColumns, olapTable.getBfFpp(),
-                                                             tabletIdSet, isRestore);
+                    olapTable.getId(),
+                    partitionId, partitionName,
+                    indexIdToShortKeyColumnCount,
+                    indexIdToSchemaHash,
+                    indexIdToStorageType,
+                    indexIdToSchema,
+                    olapTable.getKeysType(),
+                    distributionInfo,
+                    dataProperty.getStorageMedium(),
+                    singlePartitionDesc.getReplicationNum(),
+                    versionInfo, bfColumns, olapTable.getBfFpp(),
+                    tabletIdSet, isRestore);
 
             // check again
             db.writeLock();
@@ -3125,13 +3128,13 @@ private Partition createPartitionWithIndices(String clusterName, long dbId, long
                         long backendId = replica.getBackendId();
                         countDownLatch.addMark(backendId, tabletId);
                         CreateReplicaTask task = new CreateReplicaTask(backendId, dbId, tableId,
-                                                                       partitionId, indexId, tabletId,
-                                                                       shortKeyColumnCount, schemaHash,
-                                                                       version, versionHash,
-                                                                       keysType,
-                                                                       storageType, storageMedium,
-                                                                       schema, bfColumns, bfFpp,
-                                                                       countDownLatch);
+                                partitionId, indexId, tabletId,
+                                shortKeyColumnCount, schemaHash,
+                                version, versionHash,
+                                keysType,
+                                storageType, storageMedium,
+                                schema, bfColumns, bfFpp,
+                                countDownLatch);
                         batchTask.addTask(task);
                         // add to AgentTaskQueue for handling finish report.
                         // not for resending task
@@ -3298,7 +3301,7 @@ private Table createOlapTable(Database db, CreateTableStmt stmt, boolean isResto
                 DataProperty dataProperty = null;
                 try {
                     dataProperty = PropertyAnalyzer.analyzeDataProperty(stmt.getProperties(),
-                                                                        DataProperty.DEFAULT_HDD_DATA_PROPERTY);
+                            DataProperty.DEFAULT_HDD_DATA_PROPERTY);
                 } catch (AnalysisException e) {
                     throw new DdlException(e.getMessage());
                 }
@@ -3316,18 +3319,18 @@ private Table createOlapTable(Database db, CreateTableStmt stmt, boolean isResto
 
                 // create partition
                 Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(),
-                                                                 olapTable.getId(),
-                                                                 partitionId, partitionName,
-                                                                 olapTable.getIndexIdToShortKeyColumnCount(),
-                                                                 olapTable.getIndexIdToSchemaHash(),
-                                                                 olapTable.getIndexIdToStorageType(),
-                                                                 olapTable.getIndexIdToSchema(),
-                                                                 keysType,
-                                                                 distributionInfo,
-                                                                 dataProperty.getStorageMedium(),
-                                                                 replicationNum,
-                                                                 versionInfo, bfColumns, bfFpp,
-                                                                 tabletIdSet, isRestore);
+                        olapTable.getId(),
+                        partitionId, partitionName,
+                        olapTable.getIndexIdToShortKeyColumnCount(),
+                        olapTable.getIndexIdToSchemaHash(),
+                        olapTable.getIndexIdToStorageType(),
+                        olapTable.getIndexIdToSchema(),
+                        keysType,
+                        distributionInfo,
+                        dataProperty.getStorageMedium(),
+                        replicationNum,
+                        versionInfo, bfColumns, bfFpp,
+                        tabletIdSet, isRestore);
                 olapTable.addPartition(partition);
             } else if (partitionInfo.getType() == PartitionType.RANGE) {
                 try {
@@ -3349,16 +3352,16 @@ private Table createOlapTable(Database db, CreateTableStmt stmt, boolean isResto
                 for (Map.Entry<String, Long> entry : partitionNameToId.entrySet()) {
                     DataProperty dataProperty = rangePartitionInfo.getDataProperty(entry.getValue());
                     Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(),
-                                                                     entry.getValue(), entry.getKey(),
-                                                                     olapTable.getIndexIdToShortKeyColumnCount(),
-                                                                     olapTable.getIndexIdToSchemaHash(),
-                                                                     olapTable.getIndexIdToStorageType(),
-                                                                     olapTable.getIndexIdToSchema(),
-                                                                     keysType, distributionInfo,
-                                                                     dataProperty.getStorageMedium(),
-                                                                     partitionInfo.getReplicationNum(entry.getValue()),
-                                                                     versionInfo, bfColumns, bfFpp,
-                                                                     tabletIdSet, isRestore);
+                            entry.getValue(), entry.getKey(),
+                            olapTable.getIndexIdToShortKeyColumnCount(),
+                            olapTable.getIndexIdToSchemaHash(),
+                            olapTable.getIndexIdToStorageType(),
+                            olapTable.getIndexIdToSchema(),
+                            keysType, distributionInfo,
+                            dataProperty.getStorageMedium(),
+                            partitionInfo.getReplicationNum(entry.getValue()),
+                            versionInfo, bfColumns, bfFpp,
+                            tabletIdSet, isRestore);
                     olapTable.addPartition(partition);
                 }
             } else {
@@ -3409,14 +3412,14 @@ private Table createMysqlTable(Database db, CreateTableStmt stmt, boolean isRest
 
         return returnTable;
     }
-    
+
     private Table createEsTable(Database db, CreateTableStmt stmt) throws DdlException {
         String tableName = stmt.getTableName();
 
         // create columns
         List<Column> baseSchema = stmt.getColumns();
         validateColumns(baseSchema);
-        
+
         // create partition info
         PartitionDesc partitionDesc = stmt.getPartitionDesc();
         PartitionInfo partitionInfo = null;
@@ -3456,7 +3459,7 @@ private Table createKuduTable(Database db, CreateTableStmt stmt) throws DdlExcep
         HashDistributionDesc hashDistri = (HashDistributionDesc) stmt.getDistributionDesc();
         kuduCreateOpts.addHashPartitions(hashDistri.getDistributionColumnNames(), hashDistri.getBuckets());
         KuduPartition hashPartition = KuduPartition.createHashPartition(hashDistri.getDistributionColumnNames(),
-                                                                        hashDistri.getBuckets());
+                hashDistri.getBuckets());
 
         // 3. partition, if exist
         KuduPartition rangePartition = null;
@@ -3549,8 +3552,8 @@ private Table createBrokerTable(Database db, CreateTableStmt stmt, boolean isRes
     }
 
     public static void getDdlStmt(Table table, List<String> createTableStmt, List<String> addPartitionStmt,
-            List<String> createRollupStmt, boolean separatePartition, short replicationNum,
-            boolean hidePassword) {
+                                  List<String> createRollupStmt, boolean separatePartition, short replicationNum,
+                                  boolean hidePassword) {
         StringBuilder sb = new StringBuilder();
 
         // 1. create table
@@ -3565,7 +3568,7 @@ public static void getDdlStmt(Table table, List<String> createTableStmt, List<St
 
         // 1.2 other table type
         sb.append("CREATE ");
-        if (table.getType() == TableType.KUDU || table.getType() == TableType.MYSQL 
+        if (table.getType() == TableType.KUDU || table.getType() == TableType.MYSQL
                 || table.getType() == TableType.ELASTICSEARCH) {
             sb.append("EXTERNAL ");
         }
@@ -3708,7 +3711,7 @@ public static void getDdlStmt(Table table, List<String> createTableStmt, List<St
             sb.append(";");
         } else if (table.getType() == TableType.ELASTICSEARCH) {
             EsTable esTable = (EsTable) table;
-            
+
             // partition
             PartitionInfo partitionInfo = esTable.getPartitionInfo();
             if (partitionInfo.getType() == PartitionType.RANGE) {
@@ -3724,7 +3727,7 @@ public static void getDdlStmt(Table table, List<String> createTableStmt, List<St
                 }
                 sb.append(")\n()");
             }
-            
+
             // properties
             sb.append("\nPROPERTIES (\n");
             sb.append("\"host\" = \"").append(esTable.getHosts()).append("\",\n");
@@ -3823,8 +3826,8 @@ public void replayCreateTable(String dbName, Table table) {
     }
 
     private void createTablets(String clusterName, MaterializedIndex index, ReplicaState replicaState,
-            DistributionInfo distributionInfo, long version, long versionHash, short replicationNum,
-            TabletMeta tabletMeta, Set<Long> tabletIdSet) throws DdlException {
+                               DistributionInfo distributionInfo, long version, long versionHash, short replicationNum,
+                               TabletMeta tabletMeta, Set<Long> tabletIdSet) throws DdlException {
         Preconditions.checkArgument(replicationNum > 0);
 
         DistributionInfoType distributionInfoType = distributionInfo.getType();
@@ -3865,7 +3868,7 @@ public void dropTable(DropTableStmt stmt) throws DdlException {
         if (fullNameToDb.get(dbName) == null) {
             ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
         }
-       
+
         Table table = null;
         db.writeLock();
         try {
@@ -3958,7 +3961,7 @@ public void replayRecoverTable(RecoverInfo info) {
     }
 
     public void handleJobsWhenDeleteReplica(long tableId, long partitionId, long indexId, long tabletId, long replicaId,
-            long backendId) {
+                                            long backendId) {
         // rollup
         getRollupHandler().removeReplicaRelatedTask(tableId, partitionId, indexId, tabletId, backendId);
 
@@ -3977,10 +3980,10 @@ public void unprotectAddReplica(ReplicaPersistInfo info) {
         MaterializedIndex materializedIndex = partition.getIndex(info.getIndexId());
         Tablet tablet = materializedIndex.getTablet(info.getTabletId());
         Replica replica = new Replica(info.getReplicaId(), info.getBackendId(), info.getVersion(),
-                info.getVersionHash(), info.getDataSize(), info.getRowCount(), ReplicaState.NORMAL, 
-                info.getLastFailedVersion(), 
+                info.getVersionHash(), info.getDataSize(), info.getRowCount(), ReplicaState.NORMAL,
+                info.getLastFailedVersion(),
                 info.getLastFailedVersionHash(),
-                info.getLastSuccessVersion(), 
+                info.getLastSuccessVersion(),
                 info.getLastSuccessVersionHash());
         tablet.addReplica(replica);
     }
@@ -4076,7 +4079,7 @@ public Database getDb(String name) {
                 String clusterName = ClusterNamespace.getClusterNameFromFullName(name);
                 return fullNameToDb.get(ClusterNamespace.getFullName(clusterName, dbName.toLowerCase()));
             }
-        } 
+        }
         return null;
     }
 
@@ -4197,17 +4200,17 @@ public long getNextId() {
                                 && dataProperty.getCooldownTimeMs() < currentTimeMs) {
                             // expire. change to HDD.
                             partitionInfo.setDataProperty(partition.getId(),
-                                                          DataProperty.DEFAULT_HDD_DATA_PROPERTY);
+                                    DataProperty.DEFAULT_HDD_DATA_PROPERTY);
                             storageMediumMap.put(partitionId, TStorageMedium.HDD);
                             LOG.debug("partition[{}-{}-{}] storage medium changed from SSD to HDD",
-                                      dbId, tableId, partitionId);
+                                    dbId, tableId, partitionId);
 
                             // log
                             ModifyPartitionInfo info =
                                     new ModifyPartitionInfo(db.getId(), olapTable.getId(),
-                                                            partition.getId(),
-                                                            DataProperty.DEFAULT_HDD_DATA_PROPERTY,
-                                                            (short) -1);
+                                            partition.getId(),
+                                            DataProperty.DEFAULT_HDD_DATA_PROPERTY,
+                                            (short) -1);
                             editLog.logModifyPartition(info);
                         }
                     } // end for partitions
@@ -4247,6 +4250,10 @@ public Load getLoadInstance() {
         return this.load;
     }
 
+    public RoutineLoad getRoutineLoadInstance() {
+        return routineLoad;
+    }
+
     public ExportMgr getExportMgr() {
         return this.exportMgr;
     }
@@ -4320,7 +4327,7 @@ public String getMasterIp() {
         }
         return this.masterIp;
     }
-    
+
     public EsStateStore getEsStateStore() {
         return this.esStateStore;
     }
@@ -5029,7 +5036,6 @@ public void processModifyCluster(AlterClusterStmt stmt) throws DdlException {
     }
 
     /**
-     *
      * @param ctx
      * @param clusterName
      * @throws DdlException
@@ -5037,7 +5043,7 @@ public void processModifyCluster(AlterClusterStmt stmt) throws DdlException {
     public void changeCluster(ConnectContext ctx, String clusterName) throws DdlException {
         if (!Catalog.getCurrentCatalog().getAuth().checkCanEnterCluster(ConnectContext.get(), clusterName)) {
             ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_AUTHORITY,
-                                           ConnectContext.get().getQualifiedUser(), "enter");
+                    ConnectContext.get().getQualifiedUser(), "enter");
         }
 
         if (!nameToCluster.containsKey(clusterName)) {
@@ -5250,6 +5256,7 @@ public Cluster getCluster(String clusterName) {
 
     /**
      * get migrate progress , when finish migration, next clonecheck will reset dbState
+     *
      * @return
      */
     public Set<BaseParam> getMigrations() {
@@ -5322,14 +5329,14 @@ public long loadCluster(DataInputStream dis, long checksum) throws IOException,
 
                 List<Long> latestBackendIds = systemInfo.getClusterBackendIds(cluster.getName());
                 if (latestBackendIds.size() != cluster.getBackendIdList().size()) {
-                    LOG.warn("Cluster:" + cluster.getName() + ", backends in Cluster is " 
-                        + cluster.getBackendIdList().size() + ", backends in SystemInfoService is "
-                        + cluster.getBackendIdList().size());
+                    LOG.warn("Cluster:" + cluster.getName() + ", backends in Cluster is "
+                            + cluster.getBackendIdList().size() + ", backends in SystemInfoService is "
+                            + cluster.getBackendIdList().size());
                 }
                 // The number of BE in cluster is not same as in SystemInfoService, when perform 'ALTER
                 // SYSTEM ADD BACKEND TO ...' or 'ALTER SYSTEM ADD BACKEND ...', because both of them are 
                 // for adding BE to some Cluster, but loadCluster is after loadBackend.
-                cluster.setBackendIdList(latestBackendIds);               
+                cluster.setBackendIdList(latestBackendIds);
 
                 final InfoSchemaDb db = new InfoSchemaDb(cluster.getName());
                 db.setClusterName(cluster.getName());
diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
new file mode 100644
index 00000000..cced4ddc
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+import com.google.common.base.Strings;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.SystemIdGenerator;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TResourceInfo;
+import org.apache.doris.thrift.TTaskType;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+public class KafkaRoutineLoadJob extends RoutineLoadJob {
+    private static final Logger LOG = LogManager.getLogger(KafkaRoutineLoadJob.class);
+
+    private static final String FE_GROUP_ID = "fe_fetch_partitions";
+    private static final int FETCH_PARTITIONS_TIMEOUT = 10;
+
+    private String serverAddress;
+    private String topic;
+    // optional
+    private List<Integer> kafkaPartitions;
+
+    public KafkaRoutineLoadJob() {
+    }
+
+    public KafkaRoutineLoadJob(long id, String name, String userName, long dbId, long tableId,
+                               String partitions, String columns, String where, String columnSeparator,
+                               int desireTaskConcurrentNum, JobState state, DataSourceType dataSourceType,
+                               int maxErrorNum, TResourceInfo resourceInfo, String serverAddress, String topic) {
+        super(id, name, userName, dbId, tableId, partitions, columns, where,
+                columnSeparator, desireTaskConcurrentNum, state, dataSourceType, maxErrorNum, resourceInfo);
+        this.serverAddress = serverAddress;
+        this.topic = topic;
+    }
+
+    @Override
+    public List<RoutineLoadTask> divideRoutineLoadJob(int currentConcurrentTaskNum) {
+        // divide kafkaPartitions into tasks
+        List<KafkaRoutineLoadTask> kafkaRoutineLoadTaskList = new ArrayList<>();
+        for (int i = 0; i < currentConcurrentTaskNum; i++) {
+            // TODO(ml): init load task
+            kafkaRoutineLoadTaskList.add(new KafkaRoutineLoadTask(getResourceInfo(), 0L, TTaskType.PUSH,
+                    dbId, tableId, 0L, 0L, 0L, SystemIdGenerator.getNextId()));
+        }
+        for (int i = 0; i < kafkaPartitions.size(); i++) {
+            kafkaRoutineLoadTaskList.get(i % currentConcurrentTaskNum).addKafkaPartition(kafkaPartitions.get(i));
+        }
+        List<RoutineLoadTask> result = new ArrayList<>();
+        result.addAll(kafkaRoutineLoadTaskList);
+        return result;
+    }
+
+    @Override
+    public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException {
+        updatePartitions();
+        SystemInfoService systemInfoService = Catalog.getCurrentSystemInfo();
+        Database db = Catalog.getCurrentCatalog().getDb(dbId);
+        if (db == null) {
+            LOG.warn("db {} is not exists from job {}", dbId, id);
+            throw new MetaNotFoundException("db " + dbId + " is not exists from job " + id);
+        }
+        String clusterName = db.getClusterName();
+        if (Strings.isNullOrEmpty(clusterName)) {
+            LOG.debug("database {} has no cluster name", dbId);
+            clusterName = SystemInfoService.DEFAULT_CLUSTER;
+        }
+        int aliveBeNum = systemInfoService.getClusterBackendIds(clusterName, true).size();
+        int partitionNum = kafkaPartitions.size();
+
+        LOG.info("current concurrent task number is min "
+                        + "(current size of partition {}, desire task concurrent num {}, alive be num {})",
+                partitionNum, desireTaskConcurrentNum, aliveBeNum);
+        return Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum));
+    }
+
+    private void updatePartitions() {
+        // fetch all of kafkaPartitions in topic
+        if (kafkaPartitions == null || kafkaPartitions.size() == 0) {
+            kafkaPartitions = new ArrayList<>();
+            Properties props = new Properties();
+            props.put("bootstrap.servers", this.serverAddress);
+            props.put("group.id", FE_GROUP_ID);
+            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+            List<PartitionInfo> partitionList = consumer.partitionsFor(
+                    topic, Duration.ofSeconds(FETCH_PARTITIONS_TIMEOUT));
+            for (PartitionInfo partitionInfo : partitionList) {
+                kafkaPartitions.add(partitionInfo.partition());
+            }
+        }
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadProgress.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadProgress.java
new file mode 100644
index 00000000..ff46af63
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadProgress.java
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+public class KafkaRoutineLoadProgress {
+
+    private String partitionName;
+    private long offset;
+}
diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadTask.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadTask.java
new file mode 100644
index 00000000..89347745
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadTask.java
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+import com.google.common.collect.Lists;
+import org.apache.doris.thrift.TResourceInfo;
+import org.apache.doris.thrift.TTaskType;
+
+import java.util.List;
+
+public class KafkaRoutineLoadTask extends RoutineLoadTask {
+
+    private List<Integer> kafkaPartitions;
+
+    public KafkaRoutineLoadTask(TResourceInfo resourceInfo, long backendId, TTaskType taskType,
+                                long dbId, long tableId, long partitionId, long indexId, long tabletId, long signature) {
+        super(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, signature);
+        this.kafkaPartitions = Lists.newArrayList();
+    }
+
+    public void addKafkaPartition(int partition) {
+        kafkaPartitions.add(partition);
+    }
+
+    public List<Integer> getKafkaPartitions() {
+        return kafkaPartitions;
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoad.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoad.java
new file mode 100644
index 00000000..0dd6b3cc
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoad.java
@@ -0,0 +1,264 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+import com.google.common.collect.Maps;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.LoadException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+public class RoutineLoad {
+    private static final Logger LOG = LogManager.getLogger(RoutineLoad.class);
+    private static final int DEFAULT_BE_CONCURRENT_TASK_NUM = 100;
+
+    // TODO(ml): real-time calculate by be
+    private Map<Long, Integer> beIdToMaxConcurrentTasks;
+
+    // stream load job meta
+    private Map<Long, RoutineLoadJob> idToRoutineLoadJob;
+    private Map<Long, RoutineLoadJob> idToNeedSchedulerRoutineLoadJob;
+    private Map<Long, RoutineLoadJob> idToRunningRoutineLoadJob;
+    private Map<Long, RoutineLoadJob> idToCancelledRoutineLoadJob;
+
+    // stream load tasks meta (not persistent)
+    private Map<Long, RoutineLoadTask> idToRoutineLoadTask;
+    private Map<Long, RoutineLoadTask> idToNeedSchedulerRoutineLoadTask;
+
+    private ReentrantReadWriteLock lock;
+
+    private void readLock() {
+        lock.readLock().lock();
+    }
+
+    private void readUnlock() {
+        lock.readLock().unlock();
+    }
+
+    private void writeLock() {
+        lock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        lock.writeLock().unlock();
+    }
+
+    public RoutineLoad() {
+        idToRoutineLoadJob = Maps.newHashMap();
+        idToNeedSchedulerRoutineLoadJob = Maps.newHashMap();
+        idToRunningRoutineLoadJob = Maps.newHashMap();
+        idToCancelledRoutineLoadJob = Maps.newHashMap();
+        idToRoutineLoadTask = Maps.newHashMap();
+        idToNeedSchedulerRoutineLoadTask = Maps.newHashMap();
+        lock = new ReentrantReadWriteLock(true);
+    }
+
+    public int getTotalMaxConcurrentTaskNum() {
+        readLock();
+        try {
+            if (beIdToMaxConcurrentTasks == null) {
+                beIdToMaxConcurrentTasks = Catalog.getCurrentSystemInfo().getBackendIds(true)
+                        .parallelStream().collect(Collectors.toMap(beId -> beId, beId -> DEFAULT_BE_CONCURRENT_TASK_NUM));
+            }
+            return beIdToMaxConcurrentTasks.values().stream().mapToInt(i -> i).sum();
+        } finally {
+            readUnlock();
+        }
+    }
+
+    public void addRoutineLoadJob(RoutineLoadJob routineLoadJob) {
+        writeLock();
+        try {
+            idToRoutineLoadJob.put(routineLoadJob.getId(), routineLoadJob);
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public void addRoutineLoadTasks(List<RoutineLoadTask> routineLoadTaskList) {
+        writeLock();
+        try {
+            idToRoutineLoadTask.putAll(routineLoadTaskList.parallelStream().collect(
+                    Collectors.toMap(task -> task.getSignature(), task -> task)));
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public Map<Long, RoutineLoadTask> getIdToRoutineLoadTask() {
+        return idToRoutineLoadTask;
+    }
+
+    public void addNeedSchedulerRoutineLoadTasks(List<RoutineLoadTask> routineLoadTaskList) {
+        writeLock();
+        try {
+            idToNeedSchedulerRoutineLoadTask.putAll(routineLoadTaskList.parallelStream().collect(
+                    Collectors.toMap(task -> task.getSignature(), task -> task)));
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public void removeRoutineLoadTasks(List<RoutineLoadTask> routineLoadTasks) {
+        if (routineLoadTasks != null) {
+            writeLock();
+            try {
+                routineLoadTasks.parallelStream().forEach(task -> idToRoutineLoadTask.remove(task.getSignature()));
+                routineLoadTasks.parallelStream().forEach(task ->
+                        idToNeedSchedulerRoutineLoadTask.remove(task.getSignature()));
+            } finally {
+                writeUnlock();
+            }
+        }
+    }
+
+    public Map<Long, RoutineLoadTask> getIdToNeedSchedulerRoutineLoadTasks() {
+        readLock();
+        try {
+            return idToNeedSchedulerRoutineLoadTask;
+        } finally {
+            readUnlock();
+        }
+    }
+
+    public List<RoutineLoadJob> getRoutineLoadJobByState(RoutineLoadJob.JobState jobState) throws LoadException {
+        List<RoutineLoadJob> jobs = new ArrayList<>();
+        Collection<RoutineLoadJob> stateJobs = null;
+        readLock();
+        LOG.debug("begin to get routine load job by state {}", jobState.name());
+        try {
+            switch (jobState) {
+                case NEED_SCHEDULER:
+                    stateJobs = idToNeedSchedulerRoutineLoadJob.values();
+                    break;
+                case PAUSED:
+                    throw new LoadException("not support getting paused routine load jobs");
+                case RUNNING:
+                    stateJobs = idToRunningRoutineLoadJob.values();
+                    break;
+                case STOPPED:
+                    throw new LoadException("not support getting stopped routine load jobs");
+                default:
+                    break;
+            }
+            if (stateJobs != null) {
+                jobs.addAll(stateJobs);
+                LOG.info("got {} routine load jobs by state {}", jobs.size(), jobState.name());
+            }
+        } finally {
+            readUnlock();
+        }
+        return jobs;
+    }
+
+    public void updateRoutineLoadJobStateNoValid(RoutineLoadJob routineLoadJob, RoutineLoadJob.JobState jobState) {
+        writeLock();
+        try {
+            RoutineLoadJob.JobState srcJobState = routineLoadJob.getState();
+            long jobId = routineLoadJob.getId();
+            LOG.info("begin to change job {} state from {} to {}", jobId, srcJobState, jobState);
+            switch (jobState) {
+                case NEED_SCHEDULER:
+                    idToRunningRoutineLoadJob.remove(jobId);
+                    idToNeedSchedulerRoutineLoadJob.put(jobId, routineLoadJob);
+                    break;
+                case PAUSED:
+                    idToNeedSchedulerRoutineLoadJob.remove(jobId);
+                    idToRunningRoutineLoadJob.remove(jobId);
+                    break;
+                case RUNNING:
+                    idToNeedSchedulerRoutineLoadJob.remove(jobId, routineLoadJob);
+                    idToRunningRoutineLoadJob.put(jobId, routineLoadJob);
+                    break;
+                case CANCELLED:
+                    idToRunningRoutineLoadJob.remove(jobId);
+                    idToNeedSchedulerRoutineLoadJob.remove(jobId);
+                    idToCancelledRoutineLoadJob.put(jobId, routineLoadJob);
+                    break;
+                case STOPPED:
+                    idToRunningRoutineLoadJob.remove(jobId);
+                    idToNeedSchedulerRoutineLoadJob.remove(jobId);
+                    break;
+                default:
+                    break;
+            }
+            routineLoadJob.setState(jobState);
+            Catalog.getInstance().getEditLog().logRoutineLoadJob(routineLoadJob);
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public void updateRoutineLoadJobState(RoutineLoadJob routineLoadJob, RoutineLoadJob.JobState jobState)
+            throws LoadException {
+        writeLock();
+        try {
+            RoutineLoadJob.JobState srcJobState = routineLoadJob.getState();
+            long jobId = routineLoadJob.getId();
+            LOG.info("begin to change job {} state from {} to {}", jobId, srcJobState, jobState);
+            checkStateTransform(srcJobState, jobState);
+            switch (jobState) {
+                case NEED_SCHEDULER:
+                    idToRunningRoutineLoadJob.remove(jobId);
+                    idToNeedSchedulerRoutineLoadJob.put(jobId, routineLoadJob);
+                    break;
+                case PAUSED:
+                    idToNeedSchedulerRoutineLoadJob.remove(jobId);
+                    idToRunningRoutineLoadJob.remove(jobId);
+                    break;
+                case RUNNING:
+                    idToNeedSchedulerRoutineLoadJob.remove(jobId, routineLoadJob);
+                    idToRunningRoutineLoadJob.put(jobId, routineLoadJob);
+                    break;
+                case CANCELLED:
+                    idToRunningRoutineLoadJob.remove(jobId);
+                    idToNeedSchedulerRoutineLoadJob.remove(jobId);
+                    idToCancelledRoutineLoadJob.put(jobId, routineLoadJob);
+                    break;
+                case STOPPED:
+                    idToRunningRoutineLoadJob.remove(jobId);
+                    idToNeedSchedulerRoutineLoadJob.remove(jobId);
+                    break;
+                default:
+                    break;
+            }
+            routineLoadJob.setState(jobState);
+            Catalog.getInstance().getEditLog().logRoutineLoadJob(routineLoadJob);
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    private void checkStateTransform(RoutineLoadJob.JobState currentState, RoutineLoadJob.JobState desireState)
+            throws LoadException {
+        if (currentState == RoutineLoadJob.JobState.PAUSED && desireState == RoutineLoadJob.JobState.NEED_SCHEDULER) {
+            throw new LoadException("could not transform " + currentState + " to " + desireState);
+        } else if (currentState == RoutineLoadJob.JobState.CANCELLED ||
+                currentState == RoutineLoadJob.JobState.STOPPED) {
+            throw new LoadException("could not transform " + currentState + " to " + desireState);
+        }
+    }
+
+}
diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
new file mode 100644
index 00000000..af737356
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.thrift.TResourceInfo;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Routine load job is a function which stream load data from streaming medium to doris.
+ * This function is suitable for streaming load job which loading data continuously
+ * The properties include stream load properties and job properties.
+ * The desireTaskConcurrentNum means that user expect the number of concurrent stream load
+ * The routine load job support different streaming medium such as KAFKA
+ */
+public class RoutineLoadJob implements Writable {
+
+    public enum JobState {
+        NEED_SCHEDULER,
+        RUNNING,
+        PAUSED,
+        STOPPED,
+        CANCELLED
+    }
+
+    public enum DataSourceType {
+        KAFKA
+    }
+
+    protected long id;
+    protected String name;
+    protected String userName;
+    protected long dbId;
+    protected long tableId;
+    protected String partitions;
+    protected String columns;
+    protected String where;
+    protected String columnSeparator;
+    protected int desireTaskConcurrentNum;
+    protected JobState state;
+    protected DataSourceType dataSourceType;
+    // max number of error data in ten thousand data
+    protected int maxErrorNum;
+    protected String progress;
+    protected ReentrantReadWriteLock lock;
+    // TODO(ml): error sample
+
+
+    public RoutineLoadJob() {
+    }
+
+    public RoutineLoadJob(long id, String name, String userName, long dbId, long tableId,
+                          String partitions, String columns, String where, String columnSeparator,
+                          int desireTaskConcurrentNum, JobState state, DataSourceType dataSourceType,
+                          int maxErrorNum, TResourceInfo resourceInfo) {
+        this.id = id;
+        this.name = name;
+        this.userName = userName;
+        this.dbId = dbId;
+        this.tableId = tableId;
+        this.partitions = partitions;
+        this.columns = columns;
+        this.where = where;
+        this.columnSeparator = columnSeparator;
+        this.desireTaskConcurrentNum = desireTaskConcurrentNum;
+        this.state = state;
+        this.dataSourceType = dataSourceType;
+        this.maxErrorNum = maxErrorNum;
+        this.resourceInfo = resourceInfo;
+        this.progress = "";
+        lock = new ReentrantReadWriteLock(true);
+    }
+
+    public void readLock() {
+        lock.readLock().lock();
+    }
+
+    public void readUnlock() {
+        lock.readLock().unlock();
+    }
+
+    public void writeLock() {
+        lock.writeLock().lock();
+    }
+
+    public void writeUnlock() {
+        lock.writeLock().unlock();
+    }
+
+    // thrift object
+    private TResourceInfo resourceInfo;
+
+    public long getId() {
+        return id;
+    }
+
+    public JobState getState() {
+        return state;
+    }
+
+    public void setState(JobState state) {
+        this.state = state;
+    }
+
+    public TResourceInfo getResourceInfo() {
+        return resourceInfo;
+    }
+
+    public List<RoutineLoadTask> divideRoutineLoadJob(int currentConcurrentTaskNum) {
+        return null;
+    }
+
+    public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException {
+        return 0;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        // TODO(ml)
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        // TODO(ml)
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
new file mode 100644
index 00000000..6c3cdefd
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.util.Daemon;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+public class RoutineLoadScheduler extends Daemon {
+
+    private static final Logger LOG = LogManager.getLogger(RoutineLoadScheduler.class);
+
+    private RoutineLoad routineLoad = Catalog.getInstance().getRoutineLoadInstance();
+
+    @Override
+    protected void runOneCycle() {
+        // get need scheduler routine jobs
+        List<RoutineLoadJob> routineLoadJobList = null;
+        try {
+            routineLoadJobList = getNeedSchedulerRoutineJobs();
+        } catch (LoadException e) {
+            LOG.error("failed to get need scheduler routine jobs");
+        }
+
+        LOG.debug("there are {} job need scheduler", routineLoadJobList.size());
+        for (RoutineLoadJob routineLoadJob : routineLoadJobList) {
+            // judge nums of tasks more then max concurrent tasks of cluster
+            List<RoutineLoadTask> routineLoadTaskList = null;
+            try {
+                routineLoadJob.writeLock();
+
+                if (routineLoadJob.getState() == RoutineLoadJob.JobState.NEED_SCHEDULER) {
+                    int currentConcurrentTaskNum = routineLoadJob.calculateCurrentConcurrentTaskNum();
+                    int totalTaskNum = currentConcurrentTaskNum + routineLoad.getIdToRoutineLoadTask().size();
+                    if (totalTaskNum > routineLoad.getTotalMaxConcurrentTaskNum()) {
+                        LOG.info("job {} concurrent task num {}, current total task num {}. "
+                                        + "desired total task num {} more then total max task num {}, "
+                                        + "skip this turn of scheduler",
+                                routineLoadJob.getId(), currentConcurrentTaskNum,
+                                routineLoad.getIdToRoutineLoadTask().size(),
+                                totalTaskNum, routineLoad.getTotalMaxConcurrentTaskNum());
+                        break;
+                    }
+                    // divide job into tasks
+                    routineLoadTaskList = routineLoadJob.divideRoutineLoadJob(currentConcurrentTaskNum);
+
+                    // update tasks meta
+                    routineLoad.addRoutineLoadTasks(routineLoadTaskList);
+                    routineLoad.addNeedSchedulerRoutineLoadTasks(routineLoadTaskList);
+
+                    // change job state to running
+                    routineLoad.updateRoutineLoadJobState(routineLoadJob, RoutineLoadJob.JobState.RUNNING);
+                }
+            } catch (MetaNotFoundException e) {
+                routineLoad.updateRoutineLoadJobStateNoValid(routineLoadJob, RoutineLoadJob.JobState.CANCELLED);
+            } catch (LoadException e) {
+                LOG.error("failed to scheduler job {} with error massage {}", routineLoadJob.getId(),
+                        e.getMessage(), e);
+                routineLoad.removeRoutineLoadTasks(routineLoadTaskList);
+            } finally {
+                routineLoadJob.writeUnlock();
+            }
+        }
+
+    }
+
+    private List<RoutineLoadJob> getNeedSchedulerRoutineJobs() throws LoadException {
+        return routineLoad.getRoutineLoadJobByState(RoutineLoadJob.JobState.NEED_SCHEDULER);
+    }
+
+
+}
diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTask.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTask.java
new file mode 100644
index 00000000..184feee8
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTask.java
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+import org.apache.doris.task.AgentTask;
+import org.apache.doris.thrift.TResourceInfo;
+import org.apache.doris.thrift.TTaskType;
+
+public class RoutineLoadTask extends AgentTask{
+
+    public RoutineLoadTask(TResourceInfo resourceInfo, long backendId, TTaskType taskType,
+                           long dbId, long tableId, long partitionId, long indexId, long tabletId, long signature) {
+        super(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, signature);
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java b/fe/src/main/java/org/apache/doris/persist/EditLog.java
index 5e787944..e7e33109 100644
--- a/fe/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java
@@ -49,6 +49,7 @@
 import org.apache.doris.load.Load;
 import org.apache.doris.load.LoadErrorHub;
 import org.apache.doris.load.LoadJob;
+import org.apache.doris.load.routineload.RoutineLoadJob;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mysql.privilege.UserProperty;
 import org.apache.doris.mysql.privilege.UserPropertyInfo;
@@ -202,7 +203,7 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) {
                     DropPartitionInfo info = (DropPartitionInfo) journal.getData();
                     LOG.info("Begin to unprotect drop partition. db = " + info.getDbId()
                             + " table = " + info.getTableId()
-                                + " partitionName = " + info.getPartitionName());
+                            + " partitionName = " + info.getPartitionName());
                     catalog.replayDropPartition(info);
                     break;
                 }
@@ -505,8 +506,8 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) {
                     int version = Integer.parseInt(versionString);
                     if (catalog.getJournalVersion() > FeConstants.meta_version) {
                         LOG.error("meta data version is out of date, image: {}. meta: {}."
-                                + "please update FeConstants.meta_version and restart.",
-                                  catalog.getJournalVersion(), FeConstants.meta_version);
+                                        + "please update FeConstants.meta_version and restart.",
+                                catalog.getJournalVersion(), FeConstants.meta_version);
                         System.exit(-1);
                     }
                     catalog.setJournalVersion(version);
@@ -663,7 +664,7 @@ private synchronized void logEdit(short op, Writable writable) {
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("nextId = {}, numTransactions = {}, totalTimeTransactions = {}, op = {}",
-                      txId, numTransactions, totalTimeTransactions, op);
+                    txId, numTransactions, totalTimeTransactions, op);
         }
 
         if (txId == Config.edit_log_roll_num) {
@@ -691,7 +692,7 @@ public synchronized long getTxId() {
     public void logSaveNextId(long nextId) {
         logEdit(OperationType.OP_SAVE_NEXTID, new Text(Long.toString(nextId)));
     }
-    
+
     public void logSaveTransactionId(long transactionId) {
         logEdit(OperationType.OP_SAVE_TRANSACTION_ID, new Text(Long.toString(transactionId)));
     }
@@ -776,6 +777,10 @@ public void logLoadDone(LoadJob job) {
         logEdit(OperationType.OP_LOAD_DONE, job);
     }
 
+    public void logRoutineLoadJob(RoutineLoadJob job) {
+        logEdit(OperationType.OP_ROUTINE_LOAD_JOB, job);
+    }
+
     public void logStartRollup(RollupJob rollupJob) {
         logEdit(OperationType.OP_START_ROLLUP, rollupJob);
     }
@@ -783,7 +788,7 @@ public void logStartRollup(RollupJob rollupJob) {
     public void logFinishingRollup(RollupJob rollupJob) {
         logEdit(OperationType.OP_FINISHING_ROLLUP, rollupJob);
     }
-    
+
     public void logFinishRollup(RollupJob rollupJob) {
         logEdit(OperationType.OP_FINISH_ROLLUP, rollupJob);
     }
@@ -1027,7 +1032,7 @@ public void logInsertTransactionState(TransactionState transactionState) {
     public void logDeleteTransactionState(TransactionState transactionState) {
         logEdit(OperationType.OP_DELETE_TRANSACTION_STATE, transactionState);
     }
-    
+
     public void logBackupJob(BackupJob job) {
         logEdit(OperationType.OP_BACKUP_JOB, job);
     }
diff --git a/fe/src/main/java/org/apache/doris/persist/OperationType.java b/fe/src/main/java/org/apache/doris/persist/OperationType.java
index 9d2344f6..491ab4c7 100644
--- a/fe/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/src/main/java/org/apache/doris/persist/OperationType.java
@@ -138,4 +138,7 @@
     public static final short OP_FINISHING_SCHEMA_CHANGE = 103;
     public static final short OP_SAVE_TRANSACTION_ID = 104;
 
+    // routine load 110~120
+    public static final short OP_ROUTINE_LOAD_JOB = 110;
+
 }
diff --git a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
new file mode 100644
index 00000000..ab88bff6
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+import com.google.common.collect.Lists;
+import mockit.Deencapsulation;
+import mockit.Delegate;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mocked;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TResourceInfo;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class KafkaRoutineLoadJobTest {
+
+    @Test
+    public void testBeNumMin(@Mocked KafkaConsumer kafkaConsumer,
+                             @Injectable PartitionInfo partitionInfo1,
+                             @Injectable PartitionInfo partitionInfo2,
+                             @Mocked Catalog catalog,
+                             @Mocked SystemInfoService systemInfoService,
+                             @Mocked Database database) throws MetaNotFoundException {
+        List<PartitionInfo> partitionInfoList = new ArrayList<>();
+        partitionInfoList.add(partitionInfo1);
+        partitionInfoList.add(partitionInfo2);
+        List<Long> beIds = Lists.newArrayList(1L);
+
+        String clusterName = "clusterA";
+
+        new Expectations() {
+            {
+                kafkaConsumer.partitionsFor(anyString, (Duration) any);
+                result = partitionInfoList;
+                Catalog.getCurrentSystemInfo();
+                result = systemInfoService;
+                Catalog.getCurrentCatalog();
+                result = catalog;
+                catalog.getDb(anyLong);
+                result = database;
+                database.getClusterName();
+                result = clusterName;
+                systemInfoService.getClusterBackendIds(clusterName, true);
+                result = beIds;
+            }
+        };
+
+        KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "miaoling", 1L,
+                1L, "1L", "v1", "", "", 3,
+                RoutineLoadJob.JobState.NEED_SCHEDULER, RoutineLoadJob.DataSourceType.KAFKA, 0, new TResourceInfo(),
+                "", "");
+        Assert.assertEquals(1, kafkaRoutineLoadJob.calculateCurrentConcurrentTaskNum());
+    }
+
+
+    @Test
+    public void testDivideRoutineLoadJob() {
+
+        KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "miaoling", 1L,
+                1L, "1L", "v1", "", "", 3,
+                RoutineLoadJob.JobState.NEED_SCHEDULER, RoutineLoadJob.DataSourceType.KAFKA, 0, new TResourceInfo(),
+                "", "");
+
+        Deencapsulation.setField(kafkaRoutineLoadJob, "kafkaPartitions", Arrays.asList(1, 4, 6));
+
+        List<RoutineLoadTask> result = kafkaRoutineLoadJob.divideRoutineLoadJob(2);
+        Assert.assertEquals(2, result.size());
+        for (RoutineLoadTask routineLoadTask : result) {
+            KafkaRoutineLoadTask kafkaRoutineLoadTask = (KafkaRoutineLoadTask) routineLoadTask;
+            if (kafkaRoutineLoadTask.getKafkaPartitions().size() == 2) {
+                Assert.assertTrue(kafkaRoutineLoadTask.getKafkaPartitions().contains(1));
+                Assert.assertTrue(kafkaRoutineLoadTask.getKafkaPartitions().contains(6));
+            } else if (kafkaRoutineLoadTask.getKafkaPartitions().size() == 1) {
+                Assert.assertTrue(kafkaRoutineLoadTask.getKafkaPartitions().contains(4));
+            } else {
+                Assert.fail();
+            }
+        }
+    }
+}
diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
new file mode 100644
index 00000000..2a9e4de3
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.persist.EditLog;
+import org.apache.doris.system.SystemInfoService;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Catalog.class})
+public class RoutineLoadSchedulerTest {
+
+    @Test
+    public void testNormalRunOneCycle() throws LoadException, MetaNotFoundException {
+        int taskNum = 1;
+        List<RoutineLoadTask> routineLoadTaskList = new ArrayList<>();
+        KafkaRoutineLoadTask kafkaRoutineLoadTask = EasyMock.createNiceMock(KafkaRoutineLoadTask.class);
+        EasyMock.expect(kafkaRoutineLoadTask.getSignature()).andReturn(1L).anyTimes();
+        EasyMock.replay(kafkaRoutineLoadTask);
+        routineLoadTaskList.add(kafkaRoutineLoadTask);
+
+        KafkaRoutineLoadJob routineLoadJob = EasyMock.createNiceMock(KafkaRoutineLoadJob.class);
+        EasyMock.expect(routineLoadJob.calculateCurrentConcurrentTaskNum()).andReturn(taskNum).anyTimes();
+        EasyMock.expect(routineLoadJob.divideRoutineLoadJob(taskNum)).andReturn(routineLoadTaskList).anyTimes();
+        EasyMock.expect(routineLoadJob.getState()).andReturn(RoutineLoadJob.JobState.NEED_SCHEDULER).anyTimes();
+        EasyMock.replay(routineLoadJob);
+
+        SystemInfoService systemInfoService = EasyMock.createNiceMock(SystemInfoService.class);
+        List<Long> beIds = Arrays.asList(1L, 2L, 3L);
+        EasyMock.expect(systemInfoService.getBackendIds(true)).andReturn(beIds).anyTimes();
+        EasyMock.replay(systemInfoService);
+
+        Catalog catalog = EasyMock.createNiceMock(Catalog.class);
+        EditLog editLog = EasyMock.createNiceMock(EditLog.class);
+        PowerMock.mockStatic(Catalog.class);
+        EasyMock.expect(Catalog.getCurrentSystemInfo()).andReturn(systemInfoService).anyTimes();
+        EasyMock.expect(Catalog.getInstance()).andReturn(catalog).anyTimes();
+        PowerMock.replay(Catalog.class);
+
+
+        RoutineLoad routineLoad = new RoutineLoad();
+        EasyMock.expect(catalog.getEditLog()).andReturn(editLog).anyTimes();
+        EasyMock.expect(catalog.getRoutineLoadInstance()).andReturn(routineLoad).anyTimes();
+        EasyMock.replay(catalog);
+
+        routineLoad.addRoutineLoadJob(routineLoadJob);
+        routineLoad.updateRoutineLoadJobState(routineLoadJob, RoutineLoadJob.JobState.NEED_SCHEDULER);
+
+        RoutineLoadScheduler routineLoadScheduler = new RoutineLoadScheduler();
+        routineLoadScheduler.runOneCycle();
+
+        Assert.assertEquals(1, routineLoad.getIdToRoutineLoadTask().size());
+        Assert.assertEquals(1, routineLoad.getIdToNeedSchedulerRoutineLoadTasks().size());
+        Assert.assertEquals(1, routineLoad.getRoutineLoadJobByState(RoutineLoadJob.JobState.RUNNING).size());
+        Assert.assertEquals(0, routineLoad.getRoutineLoadJobByState(RoutineLoadJob.JobState.NEED_SCHEDULER).size());
+
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@doris.apache.org
For additional commands, e-mail: dev-help@doris.apache.org