You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/21 01:12:27 UTC
[doris] branch master updated: [improvement](fe-ut) use local journal to make FE ut run fast (#11038)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d36b927fdb [improvement](fe-ut) use local journal to make FE ut run fast (#11038)
d36b927fdb is described below
commit d36b927fdbc44492f45c28747bbdc7cfd473f23b
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Thu Jul 21 09:12:21 2022 +0800
[improvement](fe-ut) use local journal to make FE ut run fast (#11038)
* [improvement](fe-ut) use local journal to make FE ut run fast
---
.../src/main/java/org/apache/doris/PaloFe.java | 43 ++--
.../java/org/apache/doris/catalog/Catalog.java | 37 +--
.../main/java/org/apache/doris/common/Config.java | 4 +-
.../org/apache/doris/common/util/MasterDaemon.java | 2 +-
.../org/apache/doris/journal/JournalEntity.java | 5 +
.../apache/doris/journal/local/LocalJournal.java | 12 +-
.../doris/journal/local/LocalJournalCursor.java | 279 +++++----------------
.../org/apache/doris/metric/SystemMetrics.java | 4 +-
.../java/org/apache/doris/mysql/MysqlChannel.java | 2 +-
.../java/org/apache/doris/persist/EditLog.java | 10 +-
.../doris/persist/EditLogFileOutputStream.java | 4 +-
.../org/apache/doris/persist/OperationType.java | 3 +-
.../java/org/apache/doris/system/Frontend.java | 3 +-
.../org/apache/doris/utframe/MockedFrontend.java | 25 +-
.../apache/doris/utframe/TestWithFeService.java | 49 ++--
.../org/apache/doris/utframe/UtFrameUtils.java | 38 ++-
16 files changed, 228 insertions(+), 292 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java b/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java
index a692789cdf..817c641982 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java
@@ -61,11 +61,14 @@ public class PaloFe {
public static final String PID_DIR = System.getenv("PID_DIR");
public static void main(String[] args) {
- start(DORIS_HOME_DIR, PID_DIR, args);
+ StartupOptions options = new StartupOptions();
+ options.enableHttpServer = true;
+ options.enableQeService = true;
+ start(DORIS_HOME_DIR, PID_DIR, args, options);
}
// entrance for doris frontend
- public static void start(String dorisHomeDir, String pidDir, String[] args) {
+ public static void start(String dorisHomeDir, String pidDir, String[] args, StartupOptions options) {
if (System.getenv("DORIS_LOG_TO_STDERR") != null) {
Log4jConfig.foreground = true;
}
@@ -135,24 +138,27 @@ public class PaloFe {
// 1. HttpServer for HTTP Server
// 2. FeServer for Thrift Server
// 3. QeService for MySQL Server
- QeService qeService = new QeService(Config.query_port, Config.mysql_service_nio_enabled,
- ExecuteEnv.getInstance().getScheduler());
FeServer feServer = new FeServer(Config.rpc_port);
-
feServer.start();
- HttpServer httpServer = new HttpServer();
- httpServer.setPort(Config.http_port);
- httpServer.setMaxHttpPostSize(Config.jetty_server_max_http_post_size);
- httpServer.setAcceptors(Config.jetty_server_acceptors);
- httpServer.setSelectors(Config.jetty_server_selectors);
- httpServer.setWorkers(Config.jetty_server_workers);
- httpServer.setMaxThreads(Config.jetty_threadPool_maxThreads);
- httpServer.setMinThreads(Config.jetty_threadPool_minThreads);
- httpServer.setMaxHttpHeaderSize(Config.jetty_server_max_http_header_size);
- httpServer.start();
+ if (options.enableHttpServer) {
+ HttpServer httpServer = new HttpServer();
+ httpServer.setPort(Config.http_port);
+ httpServer.setMaxHttpPostSize(Config.jetty_server_max_http_post_size);
+ httpServer.setAcceptors(Config.jetty_server_acceptors);
+ httpServer.setSelectors(Config.jetty_server_selectors);
+ httpServer.setWorkers(Config.jetty_server_workers);
+ httpServer.setMaxThreads(Config.jetty_threadPool_maxThreads);
+ httpServer.setMinThreads(Config.jetty_threadPool_minThreads);
+ httpServer.setMaxHttpHeaderSize(Config.jetty_server_max_http_header_size);
+ httpServer.start();
+ }
- qeService.start();
+ if (options.enableQeService) {
+ QeService qeService = new QeService(Config.query_port, Config.mysql_service_nio_enabled,
+ ExecuteEnv.getInstance().getScheduler());
+ qeService.start();
+ }
ThreadPoolManager.registerAllThreadPoolMetric();
@@ -372,4 +378,9 @@ public class PaloFe {
throw e;
}
}
+
+ public static class StartupOptions {
+ public boolean enableHttpServer = true;
+ public boolean enableQeService = true;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index d865769094..40a93b1510 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -270,7 +270,7 @@ public class Catalog {
private static final int STATE_CHANGE_CHECK_INTERVAL_MS = 100;
private static final int REPLAY_INTERVAL_MS = 1;
private static final String BDB_DIR = "/bdb";
- private static final String IMAGE_DIR = "/image";
+ public static final String IMAGE_DIR = "/image";
private String metaDir;
private String bdbDir;
@@ -802,13 +802,10 @@ public class Catalog {
if (!bdbDir.exists()) {
bdbDir.mkdirs();
}
-
- File imageDir = new File(this.imageDir);
- if (!imageDir.exists()) {
- imageDir.mkdirs();
- }
- } else {
- throw new Exception("Invalid edit log type: " + Config.edit_log_type);
+ }
+ File imageDir = new File(this.imageDir);
+ if (!imageDir.exists()) {
+ imageDir.mkdirs();
}
// init plugin manager
@@ -834,18 +831,27 @@ public class Catalog {
// 6. start state listener thread
createStateListener();
listener.start();
+
+ if (!Config.edit_log_type.equalsIgnoreCase("bdb")) {
+ // If not using bdb, we need to notify the FE type transfer manually.
+ notifyNewFETypeTransfer(FrontendNodeType.MASTER);
+ }
}
// wait until FE is ready.
public void waitForReady() throws InterruptedException {
+ long counter = 0;
while (true) {
if (isReady()) {
LOG.info("catalog is ready. FE type: {}", feType);
break;
}
- Thread.sleep(2000);
- LOG.info("wait catalog to be ready. FE type: {}. is ready: {}", feType, isReady.get());
+ Thread.sleep(100);
+ if (counter++ % 20 == 0) {
+ LOG.info("wait catalog to be ready. FE type: {}. is ready: {}, counter: {}", feType, isReady.get(),
+ counter);
+ }
}
}
@@ -1224,9 +1230,11 @@ public class Catalog {
editLog.open();
- if (!haProtocol.fencing()) {
- LOG.error("fencing failed. will exit.");
- System.exit(-1);
+ if (Config.edit_log_type.equalsIgnoreCase("bdb")) {
+ if (!haProtocol.fencing()) {
+ LOG.error("fencing failed. will exit.");
+ System.exit(-1);
+ }
}
long replayStartTime = System.currentTimeMillis();
@@ -1285,7 +1293,6 @@ public class Catalog {
canRead.set(true);
isReady.set(true);
-
checkLowerCaseTableNames();
String msg = "master finished to replay journal, can write now.";
@@ -1404,7 +1411,7 @@ public class Catalog {
// transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER
// add helper sockets
- if (Config.edit_log_type.equalsIgnoreCase("BDB")) {
+ if (Config.edit_log_type.equalsIgnoreCase("bdb")) {
for (Frontend fe : frontends.values()) {
if (fe.getRole() == FrontendNodeType.FOLLOWER || fe.getRole() == FrontendNodeType.REPLICA) {
((BDBHA) getHaProtocol()).addHelperSocket(fe.getHost(), fe.getEditLogPort());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 7367f97e8f..289c4d24bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -188,10 +188,10 @@ public class Config extends ConfigBase {
/**
* Edit log type.
* BDB: write log to bdbje
- * LOCAL: deprecated.
+ * LOCAL: use local file to save edit log, only used for unit test
*/
@ConfField
- public static String edit_log_type = "BDB";
+ public static String edit_log_type = "bdb";
/**
* bdbje port
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/MasterDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/MasterDaemon.java
index 103880ef99..8d1aa8ebdb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/MasterDaemon.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/MasterDaemon.java
@@ -49,7 +49,7 @@ public class MasterDaemon extends Daemon {
try {
// not return, but sleep a while. to avoid some thread with large running interval will
// wait for a long time to start again.
- Thread.sleep(10 * 1000);
+ Thread.sleep(100);
} catch (InterruptedException e) {
LOG.warn("interrupted exception. thread: {}", getName(), e);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 0788662622..943315def0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -147,6 +147,11 @@ public class JournalEntity implements Writable {
boolean isRead = false;
LOG.debug("get opcode: {}", opCode);
switch (opCode) {
+ case OperationType.OP_LOCAL_EOF: {
+ data = null;
+ isRead = true;
+ break;
+ }
case OperationType.OP_SAVE_NEXTID: {
data = new Text();
((Text) data).readFields(in);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java
index e656955dba..e54237a7bc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java
@@ -25,6 +25,7 @@ import org.apache.doris.persist.EditLogFileOutputStream;
import org.apache.doris.persist.EditLogOutputStream;
import org.apache.doris.persist.Storage;
+import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -54,10 +55,18 @@ public class LocalJournal implements Journal {
this.journalId.set(getCurrentJournalId(storage.getEditsFileSequenceNumbers()));
long id = journalId.get();
- if (id == storage.getEditsSeq()) {
+ if (storage.getEditsSeq() == 0) {
+ // there is no edits file, create first one
+ Preconditions.checkState(id == 1, id);
+ currentEditFile = new File(imageDir, "edits.1");
+ currentEditFile.createNewFile();
+ outputStream = new EditLogFileOutputStream(currentEditFile);
+ } else if (id == storage.getEditsSeq()) {
+ // there exist edits files, point to the latest one and set position to the end of file.
this.currentEditFile = storage.getEditsFile(id);
this.outputStream = new EditLogFileOutputStream(currentEditFile);
} else {
+ // create next edits file
currentEditFile = new File(imageDir, "edits." + (id + 1));
currentEditFile.createNewFile();
outputStream = new EditLogFileOutputStream(currentEditFile);
@@ -78,6 +87,7 @@ public class LocalJournal implements Journal {
return;
}
if (outputStream != null) {
+ outputStream.flush();
outputStream.close();
}
currentEditFile = new File(imageDir, "edits." + journalId.get());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java
index ffef5bb74e..8031a2c991 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java
@@ -17,33 +17,11 @@
package org.apache.doris.journal.local;
-import org.apache.doris.catalog.Database;
-import org.apache.doris.common.io.Text;
-import org.apache.doris.ha.MasterInfo;
import org.apache.doris.journal.JournalCursor;
import org.apache.doris.journal.JournalEntity;
-import org.apache.doris.journal.bdbje.Timestamp;
-import org.apache.doris.load.DeleteInfo;
-import org.apache.doris.load.LoadErrorHub;
-import org.apache.doris.load.LoadJob;
-import org.apache.doris.persist.BatchDropInfo;
-import org.apache.doris.persist.BatchModifyPartitionsInfo;
-import org.apache.doris.persist.ConsistencyCheckInfo;
-import org.apache.doris.persist.CreateTableInfo;
-import org.apache.doris.persist.DatabaseInfo;
-import org.apache.doris.persist.DropInfo;
-import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.EditLogFileInputStream;
-import org.apache.doris.persist.ModifyPartitionInfo;
import org.apache.doris.persist.OperationType;
-import org.apache.doris.persist.PartitionPersistInfo;
-import org.apache.doris.persist.RecoverInfo;
-import org.apache.doris.persist.RefreshExternalTableInfo;
-import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.persist.Storage;
-import org.apache.doris.persist.TableInfo;
-import org.apache.doris.system.Backend;
-import org.apache.doris.system.Frontend;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -55,7 +33,10 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
-@Deprecated
+/**
+ * For unit test only.
+ * Use local file save edit logs.
+ */
public final class LocalJournalCursor implements JournalCursor {
private static final Logger LOG = LogManager.getLogger(LocalJournalCursor.class);
private String imageDir;
@@ -121,35 +102,29 @@ public final class LocalJournalCursor implements JournalCursor {
new EditLogFileInputStream(new File(imageDir, "edits." + fileName))));
while (scannedKey < fromKey) {
- short opCode = currentStream.readShort();
- if (opCode == OperationType.OP_INVALID) {
- System.out.println("Can not find the key:" + fromKey);
- throw new IOException();
- }
- getJournalEntity(currentStream, opCode);
+ getJournalEntity(currentStream);
scannedKey++;
}
}
- @Override
- public JournalEntity next() {
+ public JournalEntity next2() {
if (currentKey > toKey) {
return null;
}
JournalEntity ret = null;
try {
- short opCode = OperationType.OP_INVALID;
+ short opCode = OperationType.OP_LOCAL_EOF;
while (true) {
try {
opCode = currentStream.readShort();
- if (opCode == OperationType.OP_INVALID) {
+ if (opCode == OperationType.OP_LOCAL_EOF) {
if (nextFilePositionIndex < editFileSequenceNumbers.size()) {
currentStream.close();
currentStream = new DataInputStream(new BufferedInputStream(new EditLogFileInputStream(
- new File(imageDir, "edits." + editFileSequenceNumbers
- .get(nextFilePositionIndex)))));
+ new File(imageDir,
+ "edits." + editFileSequenceNumbers.get(nextFilePositionIndex)))));
nextFilePositionIndex++;
continue;
} else {
@@ -159,9 +134,8 @@ public final class LocalJournalCursor implements JournalCursor {
} catch (EOFException e) {
if (nextFilePositionIndex < editFileSequenceNumbers.size()) {
currentStream.close();
- currentStream = new DataInputStream(
- new BufferedInputStream(new EditLogFileInputStream(new File(
- imageDir, "edits." + editFileSequenceNumbers.get(nextFilePositionIndex)))));
+ currentStream = new DataInputStream(new BufferedInputStream(new EditLogFileInputStream(
+ new File(imageDir, "edits." + editFileSequenceNumbers.get(nextFilePositionIndex)))));
nextFilePositionIndex++;
continue;
} else {
@@ -171,7 +145,7 @@ public final class LocalJournalCursor implements JournalCursor {
break;
}
- ret = getJournalEntity(currentStream, opCode);
+ ret = getJournalEntity(currentStream);
currentKey++;
return ret;
} catch (IOException e) {
@@ -186,191 +160,64 @@ public final class LocalJournalCursor implements JournalCursor {
return ret;
}
- @Deprecated
- private JournalEntity getJournalEntity(DataInputStream in, short opCode) throws IOException {
- JournalEntity ret = new JournalEntity();
- ret.setOpCode(opCode);
- switch (opCode) {
- case OperationType.OP_SAVE_NEXTID: {
- Text text = new Text();
- text.readFields(in);
- ret.setData(text);
- break;
- }
- case OperationType.OP_SAVE_TRANSACTION_ID: {
- Text text = new Text();
- text.readFields(in);
- ret.setData(text);
- break;
- }
- case OperationType.OP_CREATE_DB: {
- Database db = new Database();
- db.readFields(in);
- ret.setData(db);
- break;
- }
- case OperationType.OP_DROP_DB: {
- Text text = new Text();
- text.readFields(in);
- ret.setData(text);
- break;
- }
- case OperationType.OP_ALTER_DB:
- case OperationType.OP_RENAME_DB: {
- DatabaseInfo info = new DatabaseInfo();
- info.readFields(in);
- ret.setData(info);
- break;
- }
- case OperationType.OP_CREATE_TABLE: {
- CreateTableInfo info = new CreateTableInfo();
- info.readFields(in);
- ret.setData(info);
- break;
- }
- case OperationType.OP_DROP_TABLE: {
- DropInfo info = new DropInfo();
- info.readFields(in);
- ret.setData(info);
- break;
- }
- case OperationType.OP_ALTER_EXTERNAL_TABLE_SCHEMA: {
- RefreshExternalTableInfo info = RefreshExternalTableInfo.read(in);
- ret.setData(info);
- break;
- }
- case OperationType.OP_ADD_PARTITION: {
- PartitionPersistInfo info = new PartitionPersistInfo();
- info.readFields(in);
- ret.setData(info);
- break;
- }
- case OperationType.OP_DROP_PARTITION: {
- DropPartitionInfo info = DropPartitionInfo.read(in);
- ret.setData(info);
- break;
- }
- case OperationType.OP_MODIFY_PARTITION: {
- ModifyPartitionInfo info = ModifyPartitionInfo.read(in);
- ret.setData(info);
- break;
- }
- case OperationType.OP_BATCH_MODIFY_PARTITION: {
- BatchModifyPartitionsInfo info = BatchModifyPartitionsInfo.read(in);
- ret.setData(info);
- break;
- }
- case OperationType.OP_ERASE_DB:
- case OperationType.OP_ERASE_TABLE:
- case OperationType.OP_ERASE_PARTITION: {
- Text text = new Text();
- text.readFields(in);
- ret.setData(text);
- break;
- }
- case OperationType.OP_RECOVER_DB:
- case OperationType.OP_RECOVER_TABLE:
- case OperationType.OP_RECOVER_PARTITION: {
- RecoverInfo recoverInfo = new RecoverInfo();
- recoverInfo.readFields(in);
- ret.setData(recoverInfo);
- break;
- }
- case OperationType.OP_CLEAR_ROLLUP_INFO: {
- ReplicaPersistInfo info = ReplicaPersistInfo.read(in);
- ret.setData(info);
- break;
- }
- case OperationType.OP_DROP_ROLLUP: {
- DropInfo info = DropInfo.read(in);
- ret.setData(info);
- break;
- }
- case OperationType.OP_BATCH_DROP_ROLLUP: {
- BatchDropInfo batchDropInfo = BatchDropInfo.read(in);
- ret.setData(batchDropInfo);
- break;
- }
- case OperationType.OP_RENAME_TABLE:
- case OperationType.OP_RENAME_ROLLUP:
- case OperationType.OP_RENAME_PARTITION: {
- TableInfo info = TableInfo.read(in);
- ret.setData(info);
- break;
- }
- case OperationType.OP_FINISH_CONSISTENCY_CHECK: {
- ConsistencyCheckInfo info = ConsistencyCheckInfo.read(in);
- ret.setData(info);
- break;
- }
- case OperationType.OP_LOAD_START:
- case OperationType.OP_LOAD_ETL:
- case OperationType.OP_LOAD_LOADING:
- case OperationType.OP_LOAD_QUORUM:
- case OperationType.OP_LOAD_DONE:
- case OperationType.OP_LOAD_CANCEL: {
- LoadJob job = new LoadJob();
- job.readFields(in);
- ret.setData(job);
- break;
- }
- case OperationType.OP_FINISH_DELETE: {
- DeleteInfo info = DeleteInfo.read(in);
- ret.setData(info);
- break;
- }
- case OperationType.OP_ADD_REPLICA:
- case OperationType.OP_DELETE_REPLICA: {
- ReplicaPersistInfo info = ReplicaPersistInfo.read(in);
- ret.setData(info);
- break;
- }
- case OperationType.OP_ADD_BACKEND:
- case OperationType.OP_DROP_BACKEND:
- case OperationType.OP_BACKEND_STATE_CHANGE: {
- Backend be = Backend.read(in);
- ret.setData(be);
- break;
- }
- case OperationType.OP_ADD_FRONTEND:
- case OperationType.OP_ADD_FIRST_FRONTEND:
- case OperationType.OP_REMOVE_FRONTEND: {
- Frontend fe = Frontend.read(in);
- ret.setData(fe);
- break;
- }
- case OperationType.OP_SET_LOAD_ERROR_HUB: {
- LoadErrorHub.Param param = new LoadErrorHub.Param();
- param.readFields(in);
- ret.setData(param);
- break;
- }
- case OperationType.OP_MASTER_INFO_CHANGE: {
- MasterInfo info = new MasterInfo();
- info.readFields(in);
- ret.setData(info);
- break;
- }
- case OperationType.OP_TIMESTAMP: {
- Timestamp stamp = new Timestamp();
- stamp.readFields(in);
- ret.setData(stamp);
- break;
- }
- case OperationType.OP_META_VERSION: {
- Text text = new Text();
- text.readFields(in);
- ret.setData(text);
+ @Override
+ public JournalEntity next() {
+ if (currentKey > toKey) {
+ return null;
+ }
+
+ JournalEntity ret = null;
+ try {
+ while (true) {
+ try {
+ ret = getJournalEntity(currentStream);
+ if (ret.getOpCode() == OperationType.OP_LOCAL_EOF) {
+ if (nextFilePositionIndex < editFileSequenceNumbers.size()) {
+ currentStream.close();
+ currentStream = new DataInputStream(new BufferedInputStream(new EditLogFileInputStream(
+ new File(imageDir,
+ "edits." + editFileSequenceNumbers.get(nextFilePositionIndex)))));
+ nextFilePositionIndex++;
+ continue;
+ } else {
+ return null;
+ }
+ }
+ } catch (EOFException e) {
+ if (nextFilePositionIndex < editFileSequenceNumbers.size()) {
+ currentStream.close();
+ currentStream = new DataInputStream(new BufferedInputStream(new EditLogFileInputStream(
+ new File(imageDir, "edits." + editFileSequenceNumbers.get(nextFilePositionIndex)))));
+ nextFilePositionIndex++;
+ continue;
+ } else {
+ return null;
+ }
+ }
break;
}
- default: {
- throw new IOException("Never seen opcode " + opCode);
+ currentKey++;
+ return ret;
+ } catch (IOException e) {
+ LOG.error("something wrong. {}", e);
+ try {
+ currentStream.close();
+ } catch (IOException e1) {
+ LOG.error(e1);
}
+ LOG.error(e);
}
return ret;
}
+ @Deprecated
+ private JournalEntity getJournalEntity(DataInputStream in) throws IOException {
+ JournalEntity ret = new JournalEntity();
+ ret.readFields(in);
+ return ret;
+ }
+
@Override
public void close() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/SystemMetrics.java b/fe/fe-core/src/main/java/org/apache/doris/metric/SystemMetrics.java
index 7e138af036..25a865065c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/SystemMetrics.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/SystemMetrics.java
@@ -105,7 +105,7 @@ public class SystemMetrics {
tcpOutSegs = Long.valueOf(parts[headerMap.get("OutSegs")]);
} catch (Exception e) {
- LOG.warn("failed to get /proc/net/snmp", e);
+ LOG.warn("failed to get /proc/net/snmp: ", e.getMessage());
}
}
@@ -141,7 +141,7 @@ public class SystemMetrics {
buffers = memInfoMap.getOrDefault("Buffers", -1L);
cached = memInfoMap.getOrDefault("Cached", -1L);
} catch (Exception e) {
- LOG.warn("failed to get /proc/meminfo", e);
+ LOG.warn("failed to get /proc/meminfo: ", e.getMessage());
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
index df7f1d856c..9c8bd622da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
@@ -76,7 +76,7 @@ public class MysqlChannel {
// avoid calling getHostName() which may trigger a name service reverse lookup
remoteHostPortString = address.getHostString() + ":" + address.getPort();
remoteIp = address.getAddress().getHostAddress();
- } else {
+ } else if (channel.getRemoteAddress() != null) {
// Reach here, what's it?
remoteHostPortString = channel.getRemoteAddress().toString();
remoteIp = channel.getRemoteAddress().toString();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 594d8c634e..02a8059a6e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -48,6 +48,7 @@ import org.apache.doris.journal.JournalCursor;
import org.apache.doris.journal.JournalEntity;
import org.apache.doris.journal.bdbje.BDBJEJournal;
import org.apache.doris.journal.bdbje.Timestamp;
+import org.apache.doris.journal.local.LocalJournal;
import org.apache.doris.load.DeleteHandler;
import org.apache.doris.load.DeleteInfo;
import org.apache.doris.load.ExportJob;
@@ -95,7 +96,14 @@ public class EditLog {
private Journal journal;
public EditLog(String nodeName) {
- journal = new BDBJEJournal(nodeName);
+ String journalType = Config.edit_log_type;
+ if (journalType.equalsIgnoreCase("bdb")) {
+ journal = new BDBJEJournal(nodeName);
+ } else if (journalType.equalsIgnoreCase("local")) {
+ journal = new LocalJournal(Catalog.getCurrentCatalog().getImageDir());
+ } else {
+ throw new IllegalArgumentException("Unknown edit log type: " + journalType);
+ }
}
public long getMaxJournalId() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java
index 802eb2c648..3d8b360704 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java
@@ -61,7 +61,7 @@ public class EditLogFileOutputStream extends EditLogOutputStream {
}
public void write(short op, Writable writable) throws IOException {
- write(op);
+ bufCurrent.writeShort(op);
writable.write(bufCurrent);
}
@@ -99,7 +99,7 @@ public class EditLogFileOutputStream extends EditLogOutputStream {
*/
public void setReadyToFlush() throws IOException {
assert bufReady.size() == 0 : "previous data is not flushed yet";
- write(OperationType.OP_INVALID); // insert end-of-file marker
+ write(OperationType.OP_LOCAL_EOF); // insert end-of-file marker
DataOutputBuffer tmp = bufReady;
bufReady = bufCurrent;
bufCurrent = tmp;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 14871828de..168d88ec96 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -21,7 +21,8 @@ import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
public class OperationType {
- public static final short OP_INVALID = -1;
+ // OP_LOCAL_EOF is only for local edit log, to indicate the end of a edit log run.
+ public static final short OP_LOCAL_EOF = -1;
public static final short OP_SAVE_NEXTID = 0;
public static final short OP_CREATE_DB = 1;
public static final short OP_DROP_DB = 2;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java
index 1dd498beb2..64e34d17dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java
@@ -18,6 +18,7 @@
package org.apache.doris.system;
import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.Config;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.ha.BDBHA;
@@ -106,7 +107,7 @@ public class Frontend implements Writable {
public boolean handleHbResponse(FrontendHbResponse hbResponse, boolean isReplay) {
boolean isChanged = false;
if (hbResponse.getStatus() == HbStatus.OK) {
- if (!isAlive && !isReplay) {
+ if (!isAlive && !isReplay && Config.edit_log_type.equalsIgnoreCase("bdb")) {
BDBHA bdbha = (BDBHA) Catalog.getCurrentCatalog().getHaProtocol();
bdbha.removeUnReadyElectableNode(nodeName, Catalog.getCurrentCatalog().getFollowerCount());
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedFrontend.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedFrontend.java
index 37346b2b15..207d926474 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedFrontend.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedFrontend.java
@@ -18,6 +18,7 @@
package org.apache.doris.utframe;
import org.apache.doris.PaloFe;
+import org.apache.doris.PaloFe.StartupOptions;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.util.PrintableMap;
@@ -57,7 +58,7 @@ import java.util.Map;
* There will be 3 directories under running dir/:
* running dir/conf/
* running dir/log/
- * running dir/palo-meta/
+ * running dir/doris-meta/
*
* All these 3 directories will be cleared first.
*
@@ -66,7 +67,7 @@ public class MockedFrontend {
public static final String FE_PROCESS = "fe";
// the running dir of this mocked frontend.
- // log/ palo-meta/ and conf/ dirs will be created under this dir.
+ // log/ doris-meta/ and conf/ dirs will be created under this dir.
private String runningDir;
// the min set of fe.conf.
private static final Map<String, String> MIN_FE_CONF;
@@ -100,7 +101,7 @@ public class MockedFrontend {
// init the fe process. This must be called before starting the frontend process.
// 1. check if all necessary environment variables are set.
- // 2. clear and create 3 dirs: runningDir/log/, runningDir/palo-meta/, runningDir/conf/
+ // 2. clear and create 3 dirs: runningDir/log/, runningDir/doris-meta/, runningDir/conf/
// 3. init fe.conf
// The content of "fe.conf" is a merge set of input `feConf` and MIN_FE_CONF
public void init(String runningDir, Map<String, String> feConf) throws EnvVarNotSetException, IOException {
@@ -121,7 +122,7 @@ public class MockedFrontend {
// clear and create log dir
createAndClearDir(runningDir + "/log/");
// clear and create meta dir
- createAndClearDir(runningDir + "/palo-meta/");
+ createAndClearDir(runningDir + "/doris-meta/");
// clear and create conf dir
createAndClearDir(runningDir + "/conf/");
// init fe.conf
@@ -134,7 +135,7 @@ public class MockedFrontend {
finalFeConf = Maps.newHashMap(MIN_FE_CONF);
// these 2 configs depends on running dir, so set them here.
finalFeConf.put("LOG_DIR", this.runningDir + "/log");
- finalFeConf.put("meta_dir", this.runningDir + "/palo-meta");
+ finalFeConf.put("meta_dir", this.runningDir + "/doris-meta");
finalFeConf.put("sys_log_dir", this.runningDir + "/log");
finalFeConf.put("audit_log_dir", this.runningDir + "/log");
finalFeConf.put("tmp_dir", this.runningDir + "/temp_dir");
@@ -183,7 +184,12 @@ public class MockedFrontend {
@Override
public void run() {
- PaloFe.start(frontend.getRunningDir(), frontend.getRunningDir(), args);
+ StartupOptions options = new StartupOptions();
+ // For FE unit tests, we don't need these 2 servers.
+ // And it also cost time to start up.
+ options.enableHttpServer = false;
+ options.enableQeService = false;
+ PaloFe.start(frontend.getRunningDir(), frontend.getRunningDir(), args, options);
}
}
@@ -195,8 +201,7 @@ public class MockedFrontend {
Thread feThread = new Thread(new FERunnable(this, args), FE_PROCESS);
feThread.start();
// wait the catalog to be ready until timeout (30 seconds)
- waitForCatalogReady(120 * 1000);
- System.out.println("Fe process is started");
+ waitForCatalogReady(30 * 1000);
}
private void waitForCatalogReady(long timeoutMs) throws FeStartException {
@@ -204,11 +209,11 @@ public class MockedFrontend {
while (!Catalog.getCurrentCatalog().isReady() && left > 0) {
System.out.println("catalog is not ready");
try {
- Thread.sleep(5000);
+ Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
- left -= 5000;
+ left -= 100;
}
if (left <= 0 && !Catalog.getCurrentCatalog().isReady()) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 87498ed605..1abad1631f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -58,6 +58,7 @@ import org.apache.doris.utframe.MockedFrontend.EnvVarNotSetException;
import org.apache.doris.utframe.MockedFrontend.FeStartException;
import org.apache.doris.utframe.MockedFrontend.NotInitException;
+import com.clearspring.analytics.util.Lists;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
@@ -204,6 +205,7 @@ public abstract class TestWithFeService {
}
Config.plugin_dir = dorisHome + "/plugins";
Config.custom_config_dir = dorisHome + "/conf";
+ Config.edit_log_type = "local";
File file = new File(Config.custom_config_dir);
if (!file.exists()) {
file.mkdir();
@@ -237,10 +239,29 @@ public abstract class TestWithFeService {
throws EnvVarNotSetException, IOException, FeStartException, NotInitException, DdlException,
InterruptedException {
int feRpcPort = startFEServer(runningDir);
+ List<Backend> bes = Lists.newArrayList();
+ System.out.println("start create backend");
for (int i = 0; i < backendNum; i++) {
- createBackend("127.0.0.1", feRpcPort);
- // sleep to wait first heartbeat
- Thread.sleep(6000);
+ bes.add(createBackend("127.0.0.1", feRpcPort));
+ }
+ System.out.println("after create backend");
+ checkBEHeartbeat(bes);
+ // Thread.sleep(2000);
+ System.out.println("after create backend2");
+ }
+
+ private void checkBEHeartbeat(List<Backend> bes) throws InterruptedException {
+ int maxTry = 10;
+ boolean allAlive = false;
+ while (maxTry-- > 0 && !allAlive) {
+ Thread.sleep(1000);
+ boolean hasDead = false;
+ for (Backend be : bes) {
+ if (!be.isAlive()) {
+ hasDead = true;
+ }
+ }
+ allAlive = !hasDead;
}
}
@@ -253,31 +274,30 @@ public abstract class TestWithFeService {
// to make cluster running well.
FeConstants.runningUnitTest = true;
int feRpcPort = startFEServer(runningDir);
+ List<Backend> bes = Lists.newArrayList();
for (int i = 0; i < backendNum; i++) {
String host = "127.0.0." + (i + 1);
- createBackend(host, feRpcPort);
+ bes.add(createBackend(host, feRpcPort));
}
- // sleep to wait first heartbeat
- Thread.sleep(6000);
+ checkBEHeartbeat(bes);
}
- protected void createBackend(String beHost, int feRpcPort) throws IOException, InterruptedException {
+ protected Backend createBackend(String beHost, int feRpcPort) throws IOException, InterruptedException {
int beHeartbeatPort = findValidPort();
int beThriftPort = findValidPort();
int beBrpcPort = findValidPort();
int beHttpPort = findValidPort();
// start be
- MockedBackend backend =
- MockedBackendFactory.createBackend(beHost, beHeartbeatPort, beThriftPort, beBrpcPort, beHttpPort,
- new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort),
- new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl());
+ MockedBackend backend = MockedBackendFactory.createBackend(beHost, beHeartbeatPort, beThriftPort, beBrpcPort,
+ beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort),
+ new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl());
backend.setFeAddress(new TNetworkAddress("127.0.0.1", feRpcPort));
backend.start();
// add be
- Backend be =
- new Backend(Catalog.getCurrentCatalog().getNextId(), backend.getHost(), backend.getHeartbeatPort());
+ Backend be = new Backend(Catalog.getCurrentCatalog().getNextId(), backend.getHost(),
+ backend.getHeartbeatPort());
DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
diskInfo1.setTotalCapacityB(1000000);
diskInfo1.setAvailableCapacityB(500000);
@@ -285,12 +305,13 @@ public abstract class TestWithFeService {
Map<String, DiskInfo> disks = Maps.newHashMap();
disks.put(diskInfo1.getRootPath(), diskInfo1);
be.setDisks(ImmutableMap.copyOf(disks));
- be.setAlive(true);
+ be.setAlive(false);
be.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER);
be.setBePort(beThriftPort);
be.setHttpPort(beHttpPort);
be.setBrpcPort(beBrpcPort);
Catalog.getCurrentSystemInfo().addBackend(be);
+ return be;
}
protected void cleanDorisFeDir(String baseDir) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
index 4e0a49016f..41f7a9196b 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
@@ -49,6 +49,7 @@ import org.apache.doris.utframe.MockedFrontend.NotInitException;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FileUtils;
@@ -155,6 +156,7 @@ public class UtFrameUtils {
}
Config.plugin_dir = dorisHome + "/plugins";
Config.custom_config_dir = dorisHome + "/conf";
+ Config.edit_log_type = "local";
File file = new File(Config.custom_config_dir);
if (!file.exists()) {
file.mkdir();
@@ -187,18 +189,36 @@ public class UtFrameUtils {
public static void createDorisCluster(String runningDir, int backendNum) throws EnvVarNotSetException, IOException,
FeStartException, NotInitException, DdlException, InterruptedException {
int feRpcPort = startFEServer(runningDir);
+ List<Backend> bes = Lists.newArrayList();
for (int i = 0; i < backendNum; i++) {
- createBackend("127.0.0.1", feRpcPort);
- // sleep to wait first heartbeat
- Thread.sleep(6000);
+ bes.add(createBackend("127.0.0.1", feRpcPort));
+ }
+ System.out.println("after create backend");
+ checkBEHeartbeat(bes);
+ // Thread.sleep(2000);
+ System.out.println("after create backend2");
+ }
+
+ private static void checkBEHeartbeat(List<Backend> bes) throws InterruptedException {
+ int maxTry = 10;
+ boolean allAlive = false;
+ while (maxTry-- > 0 && !allAlive) {
+ Thread.sleep(1000);
+ boolean hasDead = false;
+ for (Backend be : bes) {
+ if (!be.isAlive()) {
+ hasDead = true;
+ }
+ }
+ allAlive = !hasDead;
}
}
// Create multi backends with different host for unit test.
// the host of BE will be "127.0.0.1", "127.0.0.2"
public static void createDorisClusterWithMultiTag(String runningDir, int backendNum)
- throws EnvVarNotSetException, IOException, FeStartException,
- NotInitException, DdlException, InterruptedException {
+ throws EnvVarNotSetException, IOException, FeStartException, NotInitException, DdlException,
+ InterruptedException {
// set runningUnitTest to true, so that for ut,
// the agent task will be sent to "127.0.0.1" to make cluster running well.
FeConstants.runningUnitTest = true;
@@ -211,16 +231,15 @@ public class UtFrameUtils {
Thread.sleep(6000);
}
- public static void createBackend(String beHost, int feRpcPort) throws IOException, InterruptedException {
+ public static Backend createBackend(String beHost, int feRpcPort) throws IOException, InterruptedException {
int beHeartbeatPort = findValidPort();
int beThriftPort = findValidPort();
int beBrpcPort = findValidPort();
int beHttpPort = findValidPort();
// start be
- MockedBackend backend = MockedBackendFactory.createBackend(beHost,
- beHeartbeatPort, beThriftPort, beBrpcPort, beHttpPort,
- new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort),
+ MockedBackend backend = MockedBackendFactory.createBackend(beHost, beHeartbeatPort, beThriftPort, beBrpcPort,
+ beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort),
new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl());
backend.setFeAddress(new TNetworkAddress("127.0.0.1", feRpcPort));
backend.start();
@@ -241,6 +260,7 @@ public class UtFrameUtils {
be.setHttpPort(beHttpPort);
be.setBrpcPort(beBrpcPort);
Catalog.getCurrentSystemInfo().addBackend(be);
+ return be;
}
public static void cleanDorisFeDir(String baseDir) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org