You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2019/10/17 06:55:44 UTC
[incubator-doris] branch branch-0.11 updated: Limit the memory
consumption of broker scan node (#1996)
This is an automated email from the ASF dual-hosted git repository.
zhaoc pushed a commit to branch branch-0.11
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/branch-0.11 by this push:
new 4ef5a8c Limit the memory consumption of broker scan node (#1996)
4ef5a8c is described below
commit 4ef5a8c8560351d7fff7ff8fd51c4c7a75e006a8
Author: Mingyu Chen <mo...@163.com>
AuthorDate: Thu Oct 17 14:40:16 2019 +0800
Limit the memory consumption of broker scan node (#1996)
If memory exceed limit, no more row batch will be pushed to batch queue
---
be/src/exec/broker_scan_node.cpp | 8 +++++--
be/src/exec/olap_scan_node.cpp | 2 +-
be/src/olap/task/engine_storage_migration_task.cpp | 2 +-
.../main/java/org/apache/doris/alter/AlterJob.java | 4 ++--
.../org/apache/doris/analysis/DateLiteral.java | 3 +--
.../java/org/apache/doris/analysis/InsertStmt.java | 28 +++++++++++++++-------
.../org/apache/doris/clone/TabletScheduler.java | 3 ++-
.../apache/doris/load/loadv2/BrokerLoadJob.java | 9 ++++---
.../java/org/apache/doris/qe/StmtExecutor.java | 14 ++++-------
.../java/org/apache/doris/rewrite/FEFunctions.java | 14 +++++++++--
.../doris/transaction/GlobalTransactionMgr.java | 2 +-
.../org/apache/doris/rewrite/FEFunctionsTest.java | 1 +
12 files changed, 58 insertions(+), 32 deletions(-)
diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp
index 5e9fccf..242b5cb 100644
--- a/be/src/exec/broker_scan_node.cpp
+++ b/be/src/exec/broker_scan_node.cpp
@@ -39,7 +39,7 @@ BrokerScanNode::BrokerScanNode(
_tuple_desc(nullptr),
_num_running_scanners(0),
_scan_finished(false),
- _max_buffered_batches(1024),
+ _max_buffered_batches(32),
_wait_scanner_timer(nullptr) {
}
@@ -360,7 +360,11 @@ Status BrokerScanNode::scanner_scan(
while (_process_status.ok() &&
!_scan_finished.load() &&
!_runtime_state->is_cancelled() &&
- _batch_queue.size() >= _max_buffered_batches) {
+ // stop pushing more batch if
+ // 1. too many batches in queue, or
+ // 2. at least one batch in queue and memory exceed limit.
+ (_batch_queue.size() >= _max_buffered_batches
+ || (mem_tracker()->limit_exceeded() && !_batch_queue.empty()))) {
_queue_writer_cond.wait_for(l, std::chrono::seconds(1));
}
// Process already set failed, so we just return OK
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index 4acddb4..1310224 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -1199,7 +1199,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
row_batch->set_scanner_id(scanner->id());
status = scanner->get_batch(_runtime_state, row_batch, &eos);
if (!status.ok()) {
- LOG(WARNING) << "Scan thread read OlapScanner failed!";
+ LOG(WARNING) << "Scan thread read OlapScanner failed: " << status.to_string();
eos = true;
break;
}
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp
index d6da60d..8d468b7 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -110,7 +110,7 @@ OLAPStatus EngineStorageMigrationTask::_storage_medium_migrate(
}
tablet->release_header_lock();
- // generate schema hash path where files will be migrated
+ // get a random store of specified storage medium
auto stores = StorageEngine::instance()->get_stores_for_create_tablet(storage_medium);
if (stores.empty()) {
res = OLAP_ERR_INVALID_ROOT_PATH;
diff --git a/fe/src/main/java/org/apache/doris/alter/AlterJob.java b/fe/src/main/java/org/apache/doris/alter/AlterJob.java
index 91ae084..3a9992b 100644
--- a/fe/src/main/java/org/apache/doris/alter/AlterJob.java
+++ b/fe/src/main/java/org/apache/doris/alter/AlterJob.java
@@ -289,8 +289,8 @@ public abstract class AlterJob implements Writable {
if (isPreviousLoadFinished) {
return true;
} else {
- isPreviousLoadFinished = Catalog.getCurrentGlobalTransactionMgr()
- .isPreviousTransactionsFinished(transactionId, dbId);
+ isPreviousLoadFinished = Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
+ transactionId, dbId);
return isPreviousLoadFinished;
}
}
diff --git a/fe/src/main/java/org/apache/doris/analysis/DateLiteral.java b/fe/src/main/java/org/apache/doris/analysis/DateLiteral.java
index 4fdb3bd..9d2b58a 100644
--- a/fe/src/main/java/org/apache/doris/analysis/DateLiteral.java
+++ b/fe/src/main/java/org/apache/doris/analysis/DateLiteral.java
@@ -203,7 +203,7 @@ public class DateLiteral extends LiteralExpr {
second = dateTime.getSecondOfMinute();
this.type = type;
} catch (Exception ex) {
- throw new AnalysisException("date literal [" + s + "] is valid");
+ throw new AnalysisException("date literal [" + s + "] is invalid");
}
}
@@ -542,7 +542,6 @@ public class DateLiteral extends LiteralExpr {
public DateLiteral plusDays(int day) throws AnalysisException {
LocalDateTime dateTime;
- DateTimeFormatterBuilder builder = new DateTimeFormatterBuilder();
if (type == Type.DATE) {
dateTime = DATE_FORMATTER.parseLocalDateTime(getStringValue()).plusDays(day);
} else {
diff --git a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
index c0803f5..8f108ce 100644
--- a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -49,6 +49,7 @@ import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -93,6 +94,9 @@ public class InsertStmt extends DdlStmt {
private Boolean isRepartition;
private boolean isStreaming = false;
private String label = null;
+ private boolean isUserSpecifiedLabel = false;
+ // uuid will be generated at analysis phase, and be used as loadid and query id of insert plan
+ private UUID uuid;
private Map<Long, Integer> indexIdToSchemaHash = null;
@@ -128,6 +132,10 @@ public class InsertStmt extends DdlStmt {
this.queryStmt = source.getQueryStmt();
this.planHints = hints;
this.targetColumnNames = cols;
+
+ if (!Strings.isNullOrEmpty(label)) {
+ isUserSpecifiedLabel = true;
+ }
}
// Ctor for CreateTableAsSelectStmt
@@ -216,8 +224,12 @@ public class InsertStmt extends DdlStmt {
return label;
}
- public boolean hasLabel() {
- return label != null;
+ public boolean isUserSpecifiedLabel() {
+ return isUserSpecifiedLabel;
+ }
+
+ public UUID getUUID() {
+ return uuid;
}
// Only valid when this statement is streaming
@@ -260,19 +272,19 @@ public class InsertStmt extends DdlStmt {
// create data sink
createDataSink();
+ uuid = UUID.randomUUID();
+ if (Strings.isNullOrEmpty(label)) {
+ label = "insert_" + uuid.toString();
+ }
+
if (targetTable instanceof OlapTable) {
String dbName = tblName.getDb();
// check exist
db = analyzer.getCatalog().getDb(dbName);
- // although the insert stmt maybe failed at next stage, but has to begin transaction here
- // if get transactionid at add job stage, the transaction id maybe a little larger, it maybe error at alter job to check
- // if all previous job finished
- UUID uuid = UUID.randomUUID();
- String jobLabel = "insert_" + uuid;
LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING;
long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS();
transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
- jobLabel, "FE: " + FrontendOptions.getLocalHostAddress(), sourceType, timeoutSecond);
+ label, "FE: " + FrontendOptions.getLocalHostAddress(), sourceType, timeoutSecond);
OlapTableSink sink = (OlapTableSink) dataSink;
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
sink.init(loadId, transactionId, db.getId());
diff --git a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java
index feebc40..b234338 100644
--- a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -882,7 +882,8 @@ public class TabletScheduler extends Daemon {
throw new SchedException(Status.SCHEDULE_FAILED, "set watermark txn " + nextTxnId);
} else if (replica.getState() == ReplicaState.DECOMMISSION && replica.getWatermarkTxnId() != -1) {
long watermarkTxnId = replica.getWatermarkTxnId();
- if (!Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watermarkTxnId, tabletCtx.getDbId())) {
+ if (!Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watermarkTxnId,
+ tabletCtx.getDbId())) {
throw new SchedException(Status.SCHEDULE_FAILED, "wait txn before " + watermarkTxnId + " to be finished");
}
}
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index f051b68..d77462e 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -49,6 +49,7 @@ import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState;
import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -79,7 +80,7 @@ public class BrokerLoadJob extends LoadJob {
// this param is used to persist the expr of columns
// the origin stmt is persisted instead of columns expr
// the expr of columns will be reanalyze when the log is replayed
- private String originStmt;
+ private String originStmt = "";
// include broker desc and data desc
private PullLoadSourceInfo dataSourceInfo = new PullLoadSourceInfo();
@@ -96,7 +97,7 @@ public class BrokerLoadJob extends LoadJob {
super(dbId, label);
this.timeoutSecond = Config.broker_load_default_timeout_second;
this.brokerDesc = brokerDesc;
- this.originStmt = originStmt;
+ this.originStmt = Strings.nullToEmpty(originStmt);
this.jobType = EtlJobType.BROKER;
this.authorizationInfo = gatherAuthInfo();
}
@@ -258,7 +259,7 @@ public class BrokerLoadJob extends LoadJob {
*/
@Override
public void analyze() {
- if (originStmt == null) {
+ if (Strings.isNullOrEmpty(originStmt)) {
return;
}
// Reset dataSourceInfo, it will be re-created in analyze
@@ -511,6 +512,8 @@ public class BrokerLoadJob extends LoadJob {
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_58) {
originStmt = Text.readString(in);
+ } else {
+ originStmt = "";
}
// The origin stmt does not be analyzed in here.
// The reason is that it will thrown MetaNotFoundException when the tableId could not be found by tableName.
diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 5ef8b4a..e615b71 100644
--- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -592,15 +592,11 @@ public class StmtExecutor {
}
long createTime = System.currentTimeMillis();
- UUID uuid = UUID.randomUUID();
- String label = insertStmt.getLabel();
- if (label == null) {
- // if label is not set, use the uuid as label
- label = uuid.toString();
- }
-
Throwable throwable = null;
+ UUID uuid = insertStmt.getUUID();
+ String label = insertStmt.getLabel();
+
long loadedRows = 0;
int filteredRows = 0;
try {
@@ -676,7 +672,7 @@ public class StmtExecutor {
LOG.warn("errors when abort txn", abortTxnException);
}
- if (!Config.using_old_load_usage_pattern && !insertStmt.hasLabel()) {
+ if (!Config.using_old_load_usage_pattern && !insertStmt.isUserSpecifiedLabel()) {
// if not using old usage pattern, or user not specify label,
// the exception will be thrown to user directly without a label
StringBuilder sb = new StringBuilder(t.getMessage());
@@ -700,7 +696,7 @@ public class StmtExecutor {
// 2. using_old_load_usage_pattern is set to true, means a label will be returned for user to show load.
// 3. has filtered rows. so a label should be returned for user to show
// 4. user specify a label for insert stmt
- if (!insertStmt.isStreaming() || Config.using_old_load_usage_pattern || filteredRows > 0 || insertStmt.hasLabel()) {
+ if (!insertStmt.isStreaming() || Config.using_old_load_usage_pattern || filteredRows > 0 || insertStmt.isUserSpecifiedLabel()) {
try {
context.getCatalog().getLoadManager().recordFinishedLoadJob(
label,
diff --git a/fe/src/main/java/org/apache/doris/rewrite/FEFunctions.java b/fe/src/main/java/org/apache/doris/rewrite/FEFunctions.java
index e1aad18..a8b7c57 100644
--- a/fe/src/main/java/org/apache/doris/rewrite/FEFunctions.java
+++ b/fe/src/main/java/org/apache/doris/rewrite/FEFunctions.java
@@ -120,18 +120,28 @@ public class FEFunctions {
return new IntLiteral(unixTime, Type.INT);
}
+ @FEFunction(name = "unix_timestamp", argTypes = { "DATE" }, returnType = "INT")
+ public static IntLiteral unixTimestamp2(LiteralExpr arg) throws AnalysisException {
+ long unixTime = ((DateLiteral) arg).unixTimestamp(TimeUtils.getTimeZone()) / 1000;
+ // date before 1970-01-01 or after 2038-01-19 03:14:07 should return 0 for unix_timestamp() function
+ unixTime = unixTime < 0 ? 0 : unixTime;
+ unixTime = unixTime > Integer.MAX_VALUE ? 0 : unixTime;
+ return new IntLiteral(unixTime, Type.INT);
+ }
+
@FEFunction(name = "from_unixtime", argTypes = { "INT" }, returnType = "VARCHAR")
public static StringLiteral fromUnixTime(LiteralExpr unixTime) throws AnalysisException {
- //if unixTime < 0, we should return null, throw a exception and let BE process
+ // if unixTime < 0, we should return null, throw a exception and let BE process
if (unixTime.getLongValue() < 0) {
throw new AnalysisException("unixtime should larger than zero");
}
DateLiteral dl = new DateLiteral(unixTime.getLongValue() * 1000, TimeUtils.getTimeZone(), Type.DATETIME);
return new StringLiteral(dl.getStringValue());
}
+
@FEFunction(name = "from_unixtime", argTypes = { "INT", "VARCHAR" }, returnType = "VARCHAR")
public static StringLiteral fromUnixTime(LiteralExpr unixTime, StringLiteral fmtLiteral) throws AnalysisException {
- //if unixTime < 0, we should return null, throw a exception and let BE process
+ // if unixTime < 0, we should return null, throw a exception and let BE process
if (unixTime.getLongValue() < 0) {
throw new AnalysisException("unixtime should larger than zero");
}
diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index ad88468..33d7c87 100644
--- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -758,7 +758,7 @@ public class GlobalTransactionMgr {
continue;
}
if (entry.getKey() <= endTransactionId) {
- LOG.info("find a running txn with txn_id={} on db: {}, less than watermark txn_id {}",
+ LOG.debug("find a running txn with txn_id={} on db: {}, less than watermark txn_id {}",
entry.getKey(), dbId, endTransactionId);
return false;
}
diff --git a/fe/src/test/java/org/apache/doris/rewrite/FEFunctionsTest.java b/fe/src/test/java/org/apache/doris/rewrite/FEFunctionsTest.java
index 652d640..b54a9f0 100644
--- a/fe/src/test/java/org/apache/doris/rewrite/FEFunctionsTest.java
+++ b/fe/src/test/java/org/apache/doris/rewrite/FEFunctionsTest.java
@@ -54,6 +54,7 @@ public class FEFunctionsTest {
timestamp = FEFunctions.unixTimestamp(new DateLiteral("2038-01-19 03:14:07", Type.DATETIME));
// CST time zone
Assert.assertEquals(Integer.MAX_VALUE - 8 * 3600, timestamp.getValue());
+
} catch (AnalysisException e) {
e.printStackTrace();
Assert.fail();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org