You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/21 01:12:27 UTC

[doris] branch master updated: [improvement](fe-ut) use local journal to make FE ut run fast (#11038)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d36b927fdb [improvement](fe-ut) use local journal to make FE ut run fast (#11038)
d36b927fdb is described below

commit d36b927fdbc44492f45c28747bbdc7cfd473f23b
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Thu Jul 21 09:12:21 2022 +0800

    [improvement](fe-ut) use local journal to make FE ut run fast (#11038)
    
    * [improvement](fe-ut) use local journal to make FE ut run fast
---
 .../src/main/java/org/apache/doris/PaloFe.java     |  43 ++--
 .../java/org/apache/doris/catalog/Catalog.java     |  37 +--
 .../main/java/org/apache/doris/common/Config.java  |   4 +-
 .../org/apache/doris/common/util/MasterDaemon.java |   2 +-
 .../org/apache/doris/journal/JournalEntity.java    |   5 +
 .../apache/doris/journal/local/LocalJournal.java   |  12 +-
 .../doris/journal/local/LocalJournalCursor.java    | 279 +++++----------------
 .../org/apache/doris/metric/SystemMetrics.java     |   4 +-
 .../java/org/apache/doris/mysql/MysqlChannel.java  |   2 +-
 .../java/org/apache/doris/persist/EditLog.java     |  10 +-
 .../doris/persist/EditLogFileOutputStream.java     |   4 +-
 .../org/apache/doris/persist/OperationType.java    |   3 +-
 .../java/org/apache/doris/system/Frontend.java     |   3 +-
 .../org/apache/doris/utframe/MockedFrontend.java   |  25 +-
 .../apache/doris/utframe/TestWithFeService.java    |  49 ++--
 .../org/apache/doris/utframe/UtFrameUtils.java     |  38 ++-
 16 files changed, 228 insertions(+), 292 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java b/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java
index a692789cdf..817c641982 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java
@@ -61,11 +61,14 @@ public class PaloFe {
     public static final String PID_DIR = System.getenv("PID_DIR");
 
     public static void main(String[] args) {
-        start(DORIS_HOME_DIR, PID_DIR, args);
+        StartupOptions options = new StartupOptions();
+        options.enableHttpServer = true;
+        options.enableQeService = true;
+        start(DORIS_HOME_DIR, PID_DIR, args, options);
     }
 
     // entrance for doris frontend
-    public static void start(String dorisHomeDir, String pidDir, String[] args) {
+    public static void start(String dorisHomeDir, String pidDir, String[] args, StartupOptions options) {
         if (System.getenv("DORIS_LOG_TO_STDERR") != null) {
             Log4jConfig.foreground = true;
         }
@@ -135,24 +138,27 @@ public class PaloFe {
             // 1. HttpServer for HTTP Server
             // 2. FeServer for Thrift Server
             // 3. QeService for MySQL Server
-            QeService qeService = new QeService(Config.query_port, Config.mysql_service_nio_enabled,
-                    ExecuteEnv.getInstance().getScheduler());
             FeServer feServer = new FeServer(Config.rpc_port);
-
             feServer.start();
 
-            HttpServer httpServer = new HttpServer();
-            httpServer.setPort(Config.http_port);
-            httpServer.setMaxHttpPostSize(Config.jetty_server_max_http_post_size);
-            httpServer.setAcceptors(Config.jetty_server_acceptors);
-            httpServer.setSelectors(Config.jetty_server_selectors);
-            httpServer.setWorkers(Config.jetty_server_workers);
-            httpServer.setMaxThreads(Config.jetty_threadPool_maxThreads);
-            httpServer.setMinThreads(Config.jetty_threadPool_minThreads);
-            httpServer.setMaxHttpHeaderSize(Config.jetty_server_max_http_header_size);
-            httpServer.start();
+            if (options.enableHttpServer) {
+                HttpServer httpServer = new HttpServer();
+                httpServer.setPort(Config.http_port);
+                httpServer.setMaxHttpPostSize(Config.jetty_server_max_http_post_size);
+                httpServer.setAcceptors(Config.jetty_server_acceptors);
+                httpServer.setSelectors(Config.jetty_server_selectors);
+                httpServer.setWorkers(Config.jetty_server_workers);
+                httpServer.setMaxThreads(Config.jetty_threadPool_maxThreads);
+                httpServer.setMinThreads(Config.jetty_threadPool_minThreads);
+                httpServer.setMaxHttpHeaderSize(Config.jetty_server_max_http_header_size);
+                httpServer.start();
+            }
 
-            qeService.start();
+            if (options.enableQeService) {
+                QeService qeService = new QeService(Config.query_port, Config.mysql_service_nio_enabled,
+                        ExecuteEnv.getInstance().getScheduler());
+                qeService.start();
+            }
 
             ThreadPoolManager.registerAllThreadPoolMetric();
 
@@ -372,4 +378,9 @@ public class PaloFe {
             throw e;
         }
     }
+
+    public static class StartupOptions {
+        public boolean enableHttpServer = true;
+        public boolean enableQeService = true;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index d865769094..40a93b1510 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -270,7 +270,7 @@ public class Catalog {
     private static final int STATE_CHANGE_CHECK_INTERVAL_MS = 100;
     private static final int REPLAY_INTERVAL_MS = 1;
     private static final String BDB_DIR = "/bdb";
-    private static final String IMAGE_DIR = "/image";
+    public static final String IMAGE_DIR = "/image";
 
     private String metaDir;
     private String bdbDir;
@@ -802,13 +802,10 @@ public class Catalog {
             if (!bdbDir.exists()) {
                 bdbDir.mkdirs();
             }
-
-            File imageDir = new File(this.imageDir);
-            if (!imageDir.exists()) {
-                imageDir.mkdirs();
-            }
-        } else {
-            throw new Exception("Invalid edit log type: " + Config.edit_log_type);
+        }
+        File imageDir = new File(this.imageDir);
+        if (!imageDir.exists()) {
+            imageDir.mkdirs();
         }
 
         // init plugin manager
@@ -834,18 +831,27 @@ public class Catalog {
         // 6. start state listener thread
         createStateListener();
         listener.start();
+
+        if (!Config.edit_log_type.equalsIgnoreCase("bdb")) {
+            // If not using bdb, we need to notify the FE type transfer manually.
+            notifyNewFETypeTransfer(FrontendNodeType.MASTER);
+        }
     }
 
     // wait until FE is ready.
     public void waitForReady() throws InterruptedException {
+        long counter = 0;
         while (true) {
             if (isReady()) {
                 LOG.info("catalog is ready. FE type: {}", feType);
                 break;
             }
 
-            Thread.sleep(2000);
-            LOG.info("wait catalog to be ready. FE type: {}. is ready: {}", feType, isReady.get());
+            Thread.sleep(100);
+            if (counter++ % 20 == 0) {
+                LOG.info("wait catalog to be ready. FE type: {}. is ready: {}, counter: {}", feType, isReady.get(),
+                        counter);
+            }
         }
     }
 
@@ -1224,9 +1230,11 @@ public class Catalog {
 
         editLog.open();
 
-        if (!haProtocol.fencing()) {
-            LOG.error("fencing failed. will exit.");
-            System.exit(-1);
+        if (Config.edit_log_type.equalsIgnoreCase("bdb")) {
+            if (!haProtocol.fencing()) {
+                LOG.error("fencing failed. will exit.");
+                System.exit(-1);
+            }
         }
 
         long replayStartTime = System.currentTimeMillis();
@@ -1285,7 +1293,6 @@ public class Catalog {
 
         canRead.set(true);
         isReady.set(true);
-
         checkLowerCaseTableNames();
 
         String msg = "master finished to replay journal, can write now.";
@@ -1404,7 +1411,7 @@ public class Catalog {
         // transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER
 
         // add helper sockets
-        if (Config.edit_log_type.equalsIgnoreCase("BDB")) {
+        if (Config.edit_log_type.equalsIgnoreCase("bdb")) {
             for (Frontend fe : frontends.values()) {
                 if (fe.getRole() == FrontendNodeType.FOLLOWER || fe.getRole() == FrontendNodeType.REPLICA) {
                     ((BDBHA) getHaProtocol()).addHelperSocket(fe.getHost(), fe.getEditLogPort());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 7367f97e8f..289c4d24bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -188,10 +188,10 @@ public class Config extends ConfigBase {
     /**
      * Edit log type.
      * BDB: write log to bdbje
-     * LOCAL: deprecated.
+     * LOCAL: use local file to save edit log, only used for unit test
      */
     @ConfField
-    public static String edit_log_type = "BDB";
+    public static String edit_log_type = "bdb";
 
     /**
      * bdbje port
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/MasterDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/MasterDaemon.java
index 103880ef99..8d1aa8ebdb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/MasterDaemon.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/MasterDaemon.java
@@ -49,7 +49,7 @@ public class MasterDaemon extends Daemon {
             try {
                 // not return, but sleep a while. to avoid some thread with large running interval will
                 // wait for a long time to start again.
-                Thread.sleep(10 * 1000);
+                Thread.sleep(100);
             } catch (InterruptedException e) {
                 LOG.warn("interrupted exception. thread: {}", getName(), e);
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 0788662622..943315def0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -147,6 +147,11 @@ public class JournalEntity implements Writable {
         boolean isRead = false;
         LOG.debug("get opcode: {}", opCode);
         switch (opCode) {
+            case OperationType.OP_LOCAL_EOF: {
+                data = null;
+                isRead = true;
+                break;
+            }
             case OperationType.OP_SAVE_NEXTID: {
                 data = new Text();
                 ((Text) data).readFields(in);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java
index e656955dba..e54237a7bc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java
@@ -25,6 +25,7 @@ import org.apache.doris.persist.EditLogFileOutputStream;
 import org.apache.doris.persist.EditLogOutputStream;
 import org.apache.doris.persist.Storage;
 
+import com.google.common.base.Preconditions;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -54,10 +55,18 @@ public class LocalJournal implements Journal {
                 this.journalId.set(getCurrentJournalId(storage.getEditsFileSequenceNumbers()));
 
                 long id = journalId.get();
-                if (id == storage.getEditsSeq()) {
+                if (storage.getEditsSeq() == 0) {
+                    // there is no edits file, create first one
+                    Preconditions.checkState(id == 1, id);
+                    currentEditFile = new File(imageDir, "edits.1");
+                    currentEditFile.createNewFile();
+                    outputStream = new EditLogFileOutputStream(currentEditFile);
+                } else if (id == storage.getEditsSeq()) {
+                    // there exist edits files, point to the latest one and set position to the end of file.
                     this.currentEditFile = storage.getEditsFile(id);
                     this.outputStream = new EditLogFileOutputStream(currentEditFile);
                 } else {
+                    // create next edits file
                     currentEditFile = new File(imageDir, "edits." + (id + 1));
                     currentEditFile.createNewFile();
                     outputStream = new EditLogFileOutputStream(currentEditFile);
@@ -78,6 +87,7 @@ public class LocalJournal implements Journal {
                 return;
             }
             if (outputStream != null) {
+                outputStream.flush();
                 outputStream.close();
             }
             currentEditFile = new File(imageDir, "edits." + journalId.get());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java
index ffef5bb74e..8031a2c991 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java
@@ -17,33 +17,11 @@
 
 package org.apache.doris.journal.local;
 
-import org.apache.doris.catalog.Database;
-import org.apache.doris.common.io.Text;
-import org.apache.doris.ha.MasterInfo;
 import org.apache.doris.journal.JournalCursor;
 import org.apache.doris.journal.JournalEntity;
-import org.apache.doris.journal.bdbje.Timestamp;
-import org.apache.doris.load.DeleteInfo;
-import org.apache.doris.load.LoadErrorHub;
-import org.apache.doris.load.LoadJob;
-import org.apache.doris.persist.BatchDropInfo;
-import org.apache.doris.persist.BatchModifyPartitionsInfo;
-import org.apache.doris.persist.ConsistencyCheckInfo;
-import org.apache.doris.persist.CreateTableInfo;
-import org.apache.doris.persist.DatabaseInfo;
-import org.apache.doris.persist.DropInfo;
-import org.apache.doris.persist.DropPartitionInfo;
 import org.apache.doris.persist.EditLogFileInputStream;
-import org.apache.doris.persist.ModifyPartitionInfo;
 import org.apache.doris.persist.OperationType;
-import org.apache.doris.persist.PartitionPersistInfo;
-import org.apache.doris.persist.RecoverInfo;
-import org.apache.doris.persist.RefreshExternalTableInfo;
-import org.apache.doris.persist.ReplicaPersistInfo;
 import org.apache.doris.persist.Storage;
-import org.apache.doris.persist.TableInfo;
-import org.apache.doris.system.Backend;
-import org.apache.doris.system.Frontend;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -55,7 +33,10 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
-@Deprecated
+/**
+ * For unit test only.
+ * Use local file save edit logs.
+ */
 public final class LocalJournalCursor implements JournalCursor {
     private static final Logger LOG = LogManager.getLogger(LocalJournalCursor.class);
     private String imageDir;
@@ -121,35 +102,29 @@ public final class LocalJournalCursor implements JournalCursor {
                 new EditLogFileInputStream(new File(imageDir, "edits." + fileName))));
 
         while (scannedKey < fromKey) {
-            short opCode = currentStream.readShort();
-            if (opCode == OperationType.OP_INVALID) {
-                System.out.println("Can not find the key:" + fromKey);
-                throw new IOException();
-            }
-            getJournalEntity(currentStream, opCode);
+            getJournalEntity(currentStream);
             scannedKey++;
         }
     }
 
-    @Override
-    public JournalEntity next() {
+    public JournalEntity next2() {
         if (currentKey > toKey) {
             return null;
         }
 
         JournalEntity ret = null;
         try {
-            short opCode = OperationType.OP_INVALID;
+            short opCode = OperationType.OP_LOCAL_EOF;
 
             while (true) {
                 try {
                     opCode = currentStream.readShort();
-                    if (opCode == OperationType.OP_INVALID) {
+                    if (opCode == OperationType.OP_LOCAL_EOF) {
                         if (nextFilePositionIndex < editFileSequenceNumbers.size()) {
                             currentStream.close();
                             currentStream = new DataInputStream(new BufferedInputStream(new EditLogFileInputStream(
-                                    new File(imageDir, "edits." + editFileSequenceNumbers
-                                            .get(nextFilePositionIndex)))));
+                                    new File(imageDir,
+                                            "edits." + editFileSequenceNumbers.get(nextFilePositionIndex)))));
                             nextFilePositionIndex++;
                             continue;
                         } else {
@@ -159,9 +134,8 @@ public final class LocalJournalCursor implements JournalCursor {
                 } catch (EOFException e) {
                     if (nextFilePositionIndex < editFileSequenceNumbers.size()) {
                         currentStream.close();
-                        currentStream = new DataInputStream(
-                                new BufferedInputStream(new EditLogFileInputStream(new File(
-                                        imageDir, "edits." + editFileSequenceNumbers.get(nextFilePositionIndex)))));
+                        currentStream = new DataInputStream(new BufferedInputStream(new EditLogFileInputStream(
+                                new File(imageDir, "edits." + editFileSequenceNumbers.get(nextFilePositionIndex)))));
                         nextFilePositionIndex++;
                         continue;
                     } else {
@@ -171,7 +145,7 @@ public final class LocalJournalCursor implements JournalCursor {
                 break;
             }
 
-            ret = getJournalEntity(currentStream, opCode);
+            ret = getJournalEntity(currentStream);
             currentKey++;
             return ret;
         } catch (IOException e) {
@@ -186,191 +160,64 @@ public final class LocalJournalCursor implements JournalCursor {
         return ret;
     }
 
-    @Deprecated
-    private JournalEntity getJournalEntity(DataInputStream in, short opCode) throws IOException {
-        JournalEntity ret = new JournalEntity();
-        ret.setOpCode(opCode);
-        switch (opCode) {
-            case OperationType.OP_SAVE_NEXTID: {
-                Text text = new Text();
-                text.readFields(in);
-                ret.setData(text);
-                break;
-            }
-            case OperationType.OP_SAVE_TRANSACTION_ID: {
-                Text text = new Text();
-                text.readFields(in);
-                ret.setData(text);
-                break;
-            }
-            case OperationType.OP_CREATE_DB: {
-                Database db = new Database();
-                db.readFields(in);
-                ret.setData(db);
-                break;
-            }
-            case OperationType.OP_DROP_DB: {
-                Text text = new Text();
-                text.readFields(in);
-                ret.setData(text);
-                break;
-            }
-            case OperationType.OP_ALTER_DB:
-            case OperationType.OP_RENAME_DB: {
-                DatabaseInfo info = new DatabaseInfo();
-                info.readFields(in);
-                ret.setData(info);
-                break;
-            }
-            case OperationType.OP_CREATE_TABLE: {
-                CreateTableInfo info = new CreateTableInfo();
-                info.readFields(in);
-                ret.setData(info);
-                break;
-            }
-            case OperationType.OP_DROP_TABLE: {
-                DropInfo info = new DropInfo();
-                info.readFields(in);
-                ret.setData(info);
-                break;
-            }
-            case OperationType.OP_ALTER_EXTERNAL_TABLE_SCHEMA: {
-                RefreshExternalTableInfo info = RefreshExternalTableInfo.read(in);
-                ret.setData(info);
-                break;
-            }
-            case OperationType.OP_ADD_PARTITION: {
-                PartitionPersistInfo info = new PartitionPersistInfo();
-                info.readFields(in);
-                ret.setData(info);
-                break;
-            }
-            case OperationType.OP_DROP_PARTITION: {
-                DropPartitionInfo info = DropPartitionInfo.read(in);
-                ret.setData(info);
-                break;
-            }
-            case OperationType.OP_MODIFY_PARTITION: {
-                ModifyPartitionInfo info = ModifyPartitionInfo.read(in);
-                ret.setData(info);
-                break;
-            }
-            case OperationType.OP_BATCH_MODIFY_PARTITION: {
-                BatchModifyPartitionsInfo info = BatchModifyPartitionsInfo.read(in);
-                ret.setData(info);
-                break;
-            }
-            case OperationType.OP_ERASE_DB:
-            case OperationType.OP_ERASE_TABLE:
-            case OperationType.OP_ERASE_PARTITION: {
-                Text text = new Text();
-                text.readFields(in);
-                ret.setData(text);
-                break;
-            }
-            case OperationType.OP_RECOVER_DB:
-            case OperationType.OP_RECOVER_TABLE:
-            case OperationType.OP_RECOVER_PARTITION: {
-                RecoverInfo recoverInfo = new RecoverInfo();
-                recoverInfo.readFields(in);
-                ret.setData(recoverInfo);
-                break;
-            }
-            case OperationType.OP_CLEAR_ROLLUP_INFO: {
-                ReplicaPersistInfo info = ReplicaPersistInfo.read(in);
-                ret.setData(info);
-                break;
-            }
-            case OperationType.OP_DROP_ROLLUP: {
-                DropInfo info = DropInfo.read(in);
-                ret.setData(info);
-                break;
-            }
-            case OperationType.OP_BATCH_DROP_ROLLUP: {
-                BatchDropInfo batchDropInfo = BatchDropInfo.read(in);
-                ret.setData(batchDropInfo);
-                break;
-            }
-            case OperationType.OP_RENAME_TABLE:
-            case OperationType.OP_RENAME_ROLLUP:
-            case OperationType.OP_RENAME_PARTITION: {
-                TableInfo info = TableInfo.read(in);
-                ret.setData(info);
-                break;
-            }
-            case OperationType.OP_FINISH_CONSISTENCY_CHECK: {
-                ConsistencyCheckInfo info = ConsistencyCheckInfo.read(in);
-                ret.setData(info);
-                break;
-            }
-            case OperationType.OP_LOAD_START:
-            case OperationType.OP_LOAD_ETL:
-            case OperationType.OP_LOAD_LOADING:
-            case OperationType.OP_LOAD_QUORUM:
-            case OperationType.OP_LOAD_DONE:
-            case OperationType.OP_LOAD_CANCEL: {
-                LoadJob job = new LoadJob();
-                job.readFields(in);
-                ret.setData(job);
-                break;
-            }
-            case OperationType.OP_FINISH_DELETE: {
-                DeleteInfo info = DeleteInfo.read(in);
-                ret.setData(info);
-                break;
-            }
-            case OperationType.OP_ADD_REPLICA:
-            case OperationType.OP_DELETE_REPLICA: {
-                ReplicaPersistInfo info = ReplicaPersistInfo.read(in);
-                ret.setData(info);
-                break;
-            }
-            case OperationType.OP_ADD_BACKEND:
-            case OperationType.OP_DROP_BACKEND:
-            case OperationType.OP_BACKEND_STATE_CHANGE: {
-                Backend be = Backend.read(in);
-                ret.setData(be);
-                break;
-            }
-            case OperationType.OP_ADD_FRONTEND:
-            case OperationType.OP_ADD_FIRST_FRONTEND:
-            case OperationType.OP_REMOVE_FRONTEND: {
-                Frontend fe = Frontend.read(in);
-                ret.setData(fe);
-                break;
-            }
-            case OperationType.OP_SET_LOAD_ERROR_HUB: {
-                LoadErrorHub.Param param = new LoadErrorHub.Param();
-                param.readFields(in);
-                ret.setData(param);
-                break;
-            }
-            case OperationType.OP_MASTER_INFO_CHANGE: {
-                MasterInfo info = new MasterInfo();
-                info.readFields(in);
-                ret.setData(info);
-                break;
-            }
-            case OperationType.OP_TIMESTAMP: {
-                Timestamp stamp = new Timestamp();
-                stamp.readFields(in);
-                ret.setData(stamp);
-                break;
-            }
-            case OperationType.OP_META_VERSION: {
-                Text text = new Text();
-                text.readFields(in);
-                ret.setData(text);
+    @Override
+    public JournalEntity next() {
+        if (currentKey > toKey) {
+            return null;
+        }
+
+        JournalEntity ret = null;
+        try {
+            while (true) {
+                try {
+                    ret = getJournalEntity(currentStream);
+                    if (ret.getOpCode() == OperationType.OP_LOCAL_EOF) {
+                        if (nextFilePositionIndex < editFileSequenceNumbers.size()) {
+                            currentStream.close();
+                            currentStream = new DataInputStream(new BufferedInputStream(new EditLogFileInputStream(
+                                    new File(imageDir,
+                                            "edits." + editFileSequenceNumbers.get(nextFilePositionIndex)))));
+                            nextFilePositionIndex++;
+                            continue;
+                        } else {
+                            return null;
+                        }
+                    }
+                } catch (EOFException e) {
+                    if (nextFilePositionIndex < editFileSequenceNumbers.size()) {
+                        currentStream.close();
+                        currentStream = new DataInputStream(new BufferedInputStream(new EditLogFileInputStream(
+                                new File(imageDir, "edits." + editFileSequenceNumbers.get(nextFilePositionIndex)))));
+                        nextFilePositionIndex++;
+                        continue;
+                    } else {
+                        return null;
+                    }
+                }
                 break;
             }
 
-            default: {
-                throw new IOException("Never seen opcode " + opCode);
+            currentKey++;
+            return ret;
+        } catch (IOException e) {
+            LOG.error("something wrong. {}", e);
+            try {
+                currentStream.close();
+            } catch (IOException e1) {
+                LOG.error(e1);
             }
+            LOG.error(e);
         }
         return ret;
     }
 
+    @Deprecated
+    private JournalEntity getJournalEntity(DataInputStream in) throws IOException {
+        JournalEntity ret = new JournalEntity();
+        ret.readFields(in);
+        return ret;
+    }
+
     @Override
     public void close() {
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/SystemMetrics.java b/fe/fe-core/src/main/java/org/apache/doris/metric/SystemMetrics.java
index 7e138af036..25a865065c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/SystemMetrics.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/SystemMetrics.java
@@ -105,7 +105,7 @@ public class SystemMetrics {
             tcpOutSegs = Long.valueOf(parts[headerMap.get("OutSegs")]);
 
         } catch (Exception e) {
-            LOG.warn("failed to get /proc/net/snmp", e);
+            LOG.warn("failed to get /proc/net/snmp: ", e.getMessage());
         }
     }
 
@@ -141,7 +141,7 @@ public class SystemMetrics {
             buffers = memInfoMap.getOrDefault("Buffers", -1L);
             cached = memInfoMap.getOrDefault("Cached", -1L);
         } catch (Exception e) {
-            LOG.warn("failed to get /proc/meminfo", e);
+            LOG.warn("failed to get /proc/meminfo: ", e.getMessage());
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
index df7f1d856c..9c8bd622da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
@@ -76,7 +76,7 @@ public class MysqlChannel {
                     // avoid calling getHostName() which may trigger a name service reverse lookup
                     remoteHostPortString = address.getHostString() + ":" + address.getPort();
                     remoteIp = address.getAddress().getHostAddress();
-                } else {
+                } else if (channel.getRemoteAddress() != null) {
                     // Reach here, what's it?
                     remoteHostPortString = channel.getRemoteAddress().toString();
                     remoteIp = channel.getRemoteAddress().toString();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 594d8c634e..02a8059a6e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -48,6 +48,7 @@ import org.apache.doris.journal.JournalCursor;
 import org.apache.doris.journal.JournalEntity;
 import org.apache.doris.journal.bdbje.BDBJEJournal;
 import org.apache.doris.journal.bdbje.Timestamp;
+import org.apache.doris.journal.local.LocalJournal;
 import org.apache.doris.load.DeleteHandler;
 import org.apache.doris.load.DeleteInfo;
 import org.apache.doris.load.ExportJob;
@@ -95,7 +96,14 @@ public class EditLog {
     private Journal journal;
 
     public EditLog(String nodeName) {
-        journal = new BDBJEJournal(nodeName);
+        String journalType = Config.edit_log_type;
+        if (journalType.equalsIgnoreCase("bdb")) {
+            journal = new BDBJEJournal(nodeName);
+        } else if (journalType.equalsIgnoreCase("local")) {
+            journal = new LocalJournal(Catalog.getCurrentCatalog().getImageDir());
+        } else {
+            throw new IllegalArgumentException("Unknown edit log type: " + journalType);
+        }
     }
 
     public long getMaxJournalId() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java
index 802eb2c648..3d8b360704 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java
@@ -61,7 +61,7 @@ public class EditLogFileOutputStream extends EditLogOutputStream {
     }
 
     public void write(short op, Writable writable) throws IOException {
-        write(op);
+        bufCurrent.writeShort(op);
         writable.write(bufCurrent);
     }
 
@@ -99,7 +99,7 @@ public class EditLogFileOutputStream extends EditLogOutputStream {
      */
     public void setReadyToFlush() throws IOException {
         assert bufReady.size() == 0 : "previous data is not flushed yet";
-        write(OperationType.OP_INVALID); // insert end-of-file marker
+        write(OperationType.OP_LOCAL_EOF); // insert end-of-file marker
         DataOutputBuffer tmp = bufReady;
         bufReady = bufCurrent;
         bufCurrent = tmp;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 14871828de..168d88ec96 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -21,7 +21,8 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 
 public class OperationType {
-    public static final short OP_INVALID = -1;
+    // OP_LOCAL_EOF is only for local edit log, to indicate the end of a edit log run.
+    public static final short OP_LOCAL_EOF = -1;
     public static final short OP_SAVE_NEXTID = 0;
     public static final short OP_CREATE_DB = 1;
     public static final short OP_DROP_DB = 2;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java
index 1dd498beb2..64e34d17dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java
@@ -18,6 +18,7 @@
 package org.apache.doris.system;
 
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.ha.BDBHA;
@@ -106,7 +107,7 @@ public class Frontend implements Writable {
     public boolean handleHbResponse(FrontendHbResponse hbResponse, boolean isReplay) {
         boolean isChanged = false;
         if (hbResponse.getStatus() == HbStatus.OK) {
-            if (!isAlive && !isReplay) {
+            if (!isAlive && !isReplay && Config.edit_log_type.equalsIgnoreCase("bdb")) {
                 BDBHA bdbha = (BDBHA) Catalog.getCurrentCatalog().getHaProtocol();
                 bdbha.removeUnReadyElectableNode(nodeName, Catalog.getCurrentCatalog().getFollowerCount());
             }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedFrontend.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedFrontend.java
index 37346b2b15..207d926474 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedFrontend.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedFrontend.java
@@ -18,6 +18,7 @@
 package org.apache.doris.utframe;
 
 import org.apache.doris.PaloFe;
+import org.apache.doris.PaloFe.StartupOptions;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.util.PrintableMap;
 
@@ -57,7 +58,7 @@ import java.util.Map;
  * There will be 3 directories under running dir/:
  *      running dir/conf/
  *      running dir/log/
- *      running dir/palo-meta/
+ *      running dir/doris-meta/
  *
  *  All these 3 directories will be cleared first.
  *
@@ -66,7 +67,7 @@ public class MockedFrontend {
     public static final String FE_PROCESS = "fe";
 
     // the running dir of this mocked frontend.
-    // log/ palo-meta/ and conf/ dirs will be created under this dir.
+    // log/ doris-meta/ and conf/ dirs will be created under this dir.
     private String runningDir;
     // the min set of fe.conf.
     private static final Map<String, String> MIN_FE_CONF;
@@ -100,7 +101,7 @@ public class MockedFrontend {
 
     // init the fe process. This must be called before starting the frontend process.
     // 1. check if all necessary environment variables are set.
-    // 2. clear and create 3 dirs: runningDir/log/, runningDir/palo-meta/, runningDir/conf/
+    // 2. clear and create 3 dirs: runningDir/log/, runningDir/doris-meta/, runningDir/conf/
     // 3. init fe.conf
     //      The content of "fe.conf" is a merge set of input `feConf` and MIN_FE_CONF
     public void init(String runningDir, Map<String, String> feConf) throws EnvVarNotSetException, IOException {
@@ -121,7 +122,7 @@ public class MockedFrontend {
         // clear and create log dir
         createAndClearDir(runningDir + "/log/");
         // clear and create meta dir
-        createAndClearDir(runningDir + "/palo-meta/");
+        createAndClearDir(runningDir + "/doris-meta/");
         // clear and create conf dir
         createAndClearDir(runningDir + "/conf/");
         // init fe.conf
@@ -134,7 +135,7 @@ public class MockedFrontend {
         finalFeConf = Maps.newHashMap(MIN_FE_CONF);
         // these 2 configs depends on running dir, so set them here.
         finalFeConf.put("LOG_DIR", this.runningDir + "/log");
-        finalFeConf.put("meta_dir", this.runningDir + "/palo-meta");
+        finalFeConf.put("meta_dir", this.runningDir + "/doris-meta");
         finalFeConf.put("sys_log_dir", this.runningDir + "/log");
         finalFeConf.put("audit_log_dir", this.runningDir + "/log");
         finalFeConf.put("tmp_dir", this.runningDir + "/temp_dir");
@@ -183,7 +184,12 @@ public class MockedFrontend {
 
         @Override
         public void run() {
-            PaloFe.start(frontend.getRunningDir(), frontend.getRunningDir(), args);
+            StartupOptions options = new StartupOptions();
+            // For FE unit tests, we don't need these 2 servers.
+            // And it also cost time to start up.
+            options.enableHttpServer = false;
+            options.enableQeService = false;
+            PaloFe.start(frontend.getRunningDir(), frontend.getRunningDir(), args, options);
         }
     }
 
@@ -195,8 +201,7 @@ public class MockedFrontend {
         Thread feThread = new Thread(new FERunnable(this, args), FE_PROCESS);
         feThread.start();
         // wait the catalog to be ready until timeout (30 seconds)
-        waitForCatalogReady(120 * 1000);
-        System.out.println("Fe process is started");
+        waitForCatalogReady(30 * 1000);
     }
 
     private void waitForCatalogReady(long timeoutMs) throws FeStartException {
@@ -204,11 +209,11 @@ public class MockedFrontend {
         while (!Catalog.getCurrentCatalog().isReady() && left > 0) {
             System.out.println("catalog is not ready");
             try {
-                Thread.sleep(5000);
+                Thread.sleep(100);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
-            left -= 5000;
+            left -= 100;
         }
 
         if (left <= 0 && !Catalog.getCurrentCatalog().isReady()) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 87498ed605..1abad1631f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -58,6 +58,7 @@ import org.apache.doris.utframe.MockedFrontend.EnvVarNotSetException;
 import org.apache.doris.utframe.MockedFrontend.FeStartException;
 import org.apache.doris.utframe.MockedFrontend.NotInitException;
 
+import com.clearspring.analytics.util.Lists;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
@@ -204,6 +205,7 @@ public abstract class TestWithFeService {
         }
         Config.plugin_dir = dorisHome + "/plugins";
         Config.custom_config_dir = dorisHome + "/conf";
+        Config.edit_log_type = "local";
         File file = new File(Config.custom_config_dir);
         if (!file.exists()) {
             file.mkdir();
@@ -237,10 +239,29 @@ public abstract class TestWithFeService {
             throws EnvVarNotSetException, IOException, FeStartException, NotInitException, DdlException,
             InterruptedException {
         int feRpcPort = startFEServer(runningDir);
+        List<Backend> bes = Lists.newArrayList();
+        System.out.println("start create backend");
         for (int i = 0; i < backendNum; i++) {
-            createBackend("127.0.0.1", feRpcPort);
-            // sleep to wait first heartbeat
-            Thread.sleep(6000);
+            bes.add(createBackend("127.0.0.1", feRpcPort));
+        }
+        System.out.println("after create backend");
+        checkBEHeartbeat(bes);
+        // Thread.sleep(2000);
+        System.out.println("after create backend2");
+    }
+
+    private void checkBEHeartbeat(List<Backend> bes) throws InterruptedException {
+        int maxTry = 10;
+        boolean allAlive = false;
+        while (maxTry-- > 0 && !allAlive) {
+            Thread.sleep(1000);
+            boolean hasDead = false;
+            for (Backend be : bes) {
+                if (!be.isAlive()) {
+                    hasDead = true;
+                }
+            }
+            allAlive = !hasDead;
         }
     }
 
@@ -253,31 +274,30 @@ public abstract class TestWithFeService {
         // to make cluster running well.
         FeConstants.runningUnitTest = true;
         int feRpcPort = startFEServer(runningDir);
+        List<Backend> bes = Lists.newArrayList();
         for (int i = 0; i < backendNum; i++) {
             String host = "127.0.0." + (i + 1);
-            createBackend(host, feRpcPort);
+            bes.add(createBackend(host, feRpcPort));
         }
-        // sleep to wait first heartbeat
-        Thread.sleep(6000);
+        checkBEHeartbeat(bes);
     }
 
-    protected void createBackend(String beHost, int feRpcPort) throws IOException, InterruptedException {
+    protected Backend createBackend(String beHost, int feRpcPort) throws IOException, InterruptedException {
         int beHeartbeatPort = findValidPort();
         int beThriftPort = findValidPort();
         int beBrpcPort = findValidPort();
         int beHttpPort = findValidPort();
 
         // start be
-        MockedBackend backend =
-                MockedBackendFactory.createBackend(beHost, beHeartbeatPort, beThriftPort, beBrpcPort, beHttpPort,
-                        new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort),
-                        new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl());
+        MockedBackend backend = MockedBackendFactory.createBackend(beHost, beHeartbeatPort, beThriftPort, beBrpcPort,
+                beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort),
+                new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl());
         backend.setFeAddress(new TNetworkAddress("127.0.0.1", feRpcPort));
         backend.start();
 
         // add be
-        Backend be =
-                new Backend(Catalog.getCurrentCatalog().getNextId(), backend.getHost(), backend.getHeartbeatPort());
+        Backend be = new Backend(Catalog.getCurrentCatalog().getNextId(), backend.getHost(),
+                backend.getHeartbeatPort());
         DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
         diskInfo1.setTotalCapacityB(1000000);
         diskInfo1.setAvailableCapacityB(500000);
@@ -285,12 +305,13 @@ public abstract class TestWithFeService {
         Map<String, DiskInfo> disks = Maps.newHashMap();
         disks.put(diskInfo1.getRootPath(), diskInfo1);
         be.setDisks(ImmutableMap.copyOf(disks));
-        be.setAlive(true);
+        be.setAlive(false);
         be.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER);
         be.setBePort(beThriftPort);
         be.setHttpPort(beHttpPort);
         be.setBrpcPort(beBrpcPort);
         Catalog.getCurrentSystemInfo().addBackend(be);
+        return be;
     }
 
     protected void cleanDorisFeDir(String baseDir) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
index 4e0a49016f..41f7a9196b 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
@@ -49,6 +49,7 @@ import org.apache.doris.utframe.MockedFrontend.NotInitException;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.io.FileUtils;
@@ -155,6 +156,7 @@ public class UtFrameUtils {
         }
         Config.plugin_dir = dorisHome + "/plugins";
         Config.custom_config_dir = dorisHome + "/conf";
+        Config.edit_log_type = "local";
         File file = new File(Config.custom_config_dir);
         if (!file.exists()) {
             file.mkdir();
@@ -187,18 +189,36 @@ public class UtFrameUtils {
     public static void createDorisCluster(String runningDir, int backendNum) throws EnvVarNotSetException, IOException,
             FeStartException, NotInitException, DdlException, InterruptedException {
         int feRpcPort = startFEServer(runningDir);
+        List<Backend> bes = Lists.newArrayList();
         for (int i = 0; i < backendNum; i++) {
-            createBackend("127.0.0.1", feRpcPort);
-            // sleep to wait first heartbeat
-            Thread.sleep(6000);
+            bes.add(createBackend("127.0.0.1", feRpcPort));
+        }
+        System.out.println("after create backend");
+        checkBEHeartbeat(bes);
+        // Thread.sleep(2000);
+        System.out.println("after create backend2");
+    }
+
+    private static void checkBEHeartbeat(List<Backend> bes) throws InterruptedException {
+        int maxTry = 10;
+        boolean allAlive = false;
+        while (maxTry-- > 0 && !allAlive) {
+            Thread.sleep(1000);
+            boolean hasDead = false;
+            for (Backend be : bes) {
+                if (!be.isAlive()) {
+                    hasDead = true;
+                }
+            }
+            allAlive = !hasDead;
         }
     }
 
     // Create multi backends with different host for unit test.
     // the host of BE will be "127.0.0.1", "127.0.0.2"
     public static void createDorisClusterWithMultiTag(String runningDir, int backendNum)
-            throws EnvVarNotSetException, IOException, FeStartException,
-            NotInitException, DdlException, InterruptedException {
+            throws EnvVarNotSetException, IOException, FeStartException, NotInitException, DdlException,
+            InterruptedException {
         // set runningUnitTest to true, so that for ut,
         // the agent task will be sent to "127.0.0.1" to make cluster running well.
         FeConstants.runningUnitTest = true;
@@ -211,16 +231,15 @@ public class UtFrameUtils {
         Thread.sleep(6000);
     }
 
-    public static void createBackend(String beHost, int feRpcPort) throws IOException, InterruptedException {
+    public static Backend createBackend(String beHost, int feRpcPort) throws IOException, InterruptedException {
         int beHeartbeatPort = findValidPort();
         int beThriftPort = findValidPort();
         int beBrpcPort = findValidPort();
         int beHttpPort = findValidPort();
 
         // start be
-        MockedBackend backend = MockedBackendFactory.createBackend(beHost,
-                beHeartbeatPort, beThriftPort, beBrpcPort, beHttpPort,
-                new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort),
+        MockedBackend backend = MockedBackendFactory.createBackend(beHost, beHeartbeatPort, beThriftPort, beBrpcPort,
+                beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort),
                 new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl());
         backend.setFeAddress(new TNetworkAddress("127.0.0.1", feRpcPort));
         backend.start();
@@ -241,6 +260,7 @@ public class UtFrameUtils {
         be.setHttpPort(beHttpPort);
         be.setBrpcPort(beBrpcPort);
         Catalog.getCurrentSystemInfo().addBackend(be);
+        return be;
     }
 
     public static void cleanDorisFeDir(String baseDir) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org