You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2019/10/17 06:55:44 UTC

[incubator-doris] branch branch-0.11 updated: Limit the memory consumption of broker scan node (#1996)

This is an automated email from the ASF dual-hosted git repository.

zhaoc pushed a commit to branch branch-0.11
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/branch-0.11 by this push:
     new 4ef5a8c  Limit the memory consumption of broker scan node (#1996)
4ef5a8c is described below

commit 4ef5a8c8560351d7fff7ff8fd51c4c7a75e006a8
Author: Mingyu Chen <mo...@163.com>
AuthorDate: Thu Oct 17 14:40:16 2019 +0800

    Limit the memory consumption of broker scan node (#1996)
    
    If memory exceed limit, no more row batch will be pushed to batch queue
---
 be/src/exec/broker_scan_node.cpp                   |  8 +++++--
 be/src/exec/olap_scan_node.cpp                     |  2 +-
 be/src/olap/task/engine_storage_migration_task.cpp |  2 +-
 .../main/java/org/apache/doris/alter/AlterJob.java |  4 ++--
 .../org/apache/doris/analysis/DateLiteral.java     |  3 +--
 .../java/org/apache/doris/analysis/InsertStmt.java | 28 +++++++++++++++-------
 .../org/apache/doris/clone/TabletScheduler.java    |  3 ++-
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |  9 ++++---
 .../java/org/apache/doris/qe/StmtExecutor.java     | 14 ++++-------
 .../java/org/apache/doris/rewrite/FEFunctions.java | 14 +++++++++--
 .../doris/transaction/GlobalTransactionMgr.java    |  2 +-
 .../org/apache/doris/rewrite/FEFunctionsTest.java  |  1 +
 12 files changed, 58 insertions(+), 32 deletions(-)

diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp
index 5e9fccf..242b5cb 100644
--- a/be/src/exec/broker_scan_node.cpp
+++ b/be/src/exec/broker_scan_node.cpp
@@ -39,7 +39,7 @@ BrokerScanNode::BrokerScanNode(
             _tuple_desc(nullptr),
             _num_running_scanners(0),
             _scan_finished(false),
-            _max_buffered_batches(1024),
+            _max_buffered_batches(32),
             _wait_scanner_timer(nullptr) {
 }
 
@@ -360,7 +360,11 @@ Status BrokerScanNode::scanner_scan(
             while (_process_status.ok() && 
                    !_scan_finished.load() && 
                    !_runtime_state->is_cancelled() &&
-                   _batch_queue.size() >= _max_buffered_batches) {
+                    // stop pushing more batch if
+                    // 1. too many batches in queue, or
+                    // 2. at least one batch in queue and memory exceed limit.
+                   (_batch_queue.size() >= _max_buffered_batches
+                    || (mem_tracker()->limit_exceeded() && !_batch_queue.empty()))) {
                 _queue_writer_cond.wait_for(l, std::chrono::seconds(1));
             }
             // Process already set failed, so we just return OK
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index 4acddb4..1310224 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -1199,7 +1199,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
         row_batch->set_scanner_id(scanner->id());
         status = scanner->get_batch(_runtime_state, row_batch, &eos);
         if (!status.ok()) {
-            LOG(WARNING) << "Scan thread read OlapScanner failed!";
+            LOG(WARNING) << "Scan thread read OlapScanner failed: " << status.to_string();
             eos = true;
             break;
         }
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp
index d6da60d..8d468b7 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -110,7 +110,7 @@ OLAPStatus EngineStorageMigrationTask::_storage_medium_migrate(
         }
         tablet->release_header_lock();
 
-        // generate schema hash path where files will be migrated
+        // get a random store of specified storage medium
         auto stores = StorageEngine::instance()->get_stores_for_create_tablet(storage_medium);
         if (stores.empty()) {
             res = OLAP_ERR_INVALID_ROOT_PATH;
diff --git a/fe/src/main/java/org/apache/doris/alter/AlterJob.java b/fe/src/main/java/org/apache/doris/alter/AlterJob.java
index 91ae084..3a9992b 100644
--- a/fe/src/main/java/org/apache/doris/alter/AlterJob.java
+++ b/fe/src/main/java/org/apache/doris/alter/AlterJob.java
@@ -289,8 +289,8 @@ public abstract class AlterJob implements Writable {
         if (isPreviousLoadFinished) {
             return true;
         } else {
-            isPreviousLoadFinished = Catalog.getCurrentGlobalTransactionMgr()
-                    .isPreviousTransactionsFinished(transactionId, dbId);
+            isPreviousLoadFinished = Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
+                    transactionId, dbId);
             return isPreviousLoadFinished;
         }
     }
diff --git a/fe/src/main/java/org/apache/doris/analysis/DateLiteral.java b/fe/src/main/java/org/apache/doris/analysis/DateLiteral.java
index 4fdb3bd..9d2b58a 100644
--- a/fe/src/main/java/org/apache/doris/analysis/DateLiteral.java
+++ b/fe/src/main/java/org/apache/doris/analysis/DateLiteral.java
@@ -203,7 +203,7 @@ public class DateLiteral extends LiteralExpr {
             second = dateTime.getSecondOfMinute();
             this.type = type;
         } catch (Exception ex) {
-            throw new AnalysisException("date literal [" + s + "] is valid");
+            throw new AnalysisException("date literal [" + s + "] is invalid");
         }
     }
 
@@ -542,7 +542,6 @@ public class DateLiteral extends LiteralExpr {
 
     public DateLiteral plusDays(int day) throws AnalysisException {
         LocalDateTime dateTime;
-        DateTimeFormatterBuilder builder = new DateTimeFormatterBuilder();
         if (type == Type.DATE) {
             dateTime = DATE_FORMATTER.parseLocalDateTime(getStringValue()).plusDays(day);                                        
         } else {
diff --git a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
index c0803f5..8f108ce 100644
--- a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -49,6 +49,7 @@ import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -93,6 +94,9 @@ public class InsertStmt extends DdlStmt {
     private Boolean isRepartition;
     private boolean isStreaming = false;
     private String label = null;
+    private boolean isUserSpecifiedLabel = false;
+    // uuid will be generated at analysis phase, and be used as loadid and query id of insert plan
+    private UUID uuid;
 
     private Map<Long, Integer> indexIdToSchemaHash = null;
 
@@ -128,6 +132,10 @@ public class InsertStmt extends DdlStmt {
         this.queryStmt = source.getQueryStmt();
         this.planHints = hints;
         this.targetColumnNames = cols;
+
+        if (!Strings.isNullOrEmpty(label)) {
+            isUserSpecifiedLabel = true;
+        }
     }
 
     // Ctor for CreateTableAsSelectStmt
@@ -216,8 +224,12 @@ public class InsertStmt extends DdlStmt {
         return label;
     }
 
-    public boolean hasLabel() {
-        return label != null;
+    public boolean isUserSpecifiedLabel() {
+        return isUserSpecifiedLabel;
+    }
+
+    public UUID getUUID() {
+        return uuid;
     }
 
     // Only valid when this statement is streaming
@@ -260,19 +272,19 @@ public class InsertStmt extends DdlStmt {
         // create data sink
         createDataSink();
 
+        uuid = UUID.randomUUID();
+        if (Strings.isNullOrEmpty(label)) {
+            label = "insert_" + uuid.toString();
+        }
+
         if (targetTable instanceof OlapTable) {
             String dbName = tblName.getDb();
             // check exist
             db = analyzer.getCatalog().getDb(dbName);
-            // although the insert stmt maybe failed at next stage, but has to begin transaction here
-            // if get transactionid at add job stage, the transaction id maybe a little larger, it maybe error at alter job to check
-            // if all previous job finished
-            UUID uuid = UUID.randomUUID();
-            String jobLabel = "insert_" + uuid;
             LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING;
             long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS();
             transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
-                    jobLabel, "FE: " + FrontendOptions.getLocalHostAddress(), sourceType, timeoutSecond);
+                    label, "FE: " + FrontendOptions.getLocalHostAddress(), sourceType, timeoutSecond);
             OlapTableSink sink = (OlapTableSink) dataSink;
             TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
             sink.init(loadId, transactionId, db.getId());
diff --git a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java
index feebc40..b234338 100644
--- a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -882,7 +882,8 @@ public class TabletScheduler extends Daemon {
             throw new SchedException(Status.SCHEDULE_FAILED, "set watermark txn " + nextTxnId);
         } else if (replica.getState() == ReplicaState.DECOMMISSION && replica.getWatermarkTxnId() != -1) {
             long watermarkTxnId = replica.getWatermarkTxnId();
-            if (!Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watermarkTxnId, tabletCtx.getDbId())) {
+            if (!Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watermarkTxnId,
+                    tabletCtx.getDbId())) {
                 throw new SchedException(Status.SCHEDULE_FAILED, "wait txn before " + watermarkTxnId + " to be finished");
             }
         }
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index f051b68..d77462e 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -49,6 +49,7 @@ import org.apache.doris.transaction.TabletCommitInfo;
 import org.apache.doris.transaction.TransactionState;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -79,7 +80,7 @@ public class BrokerLoadJob extends LoadJob {
     // this param is used to persist the expr of columns
     // the origin stmt is persisted instead of columns expr
     // the expr of columns will be reanalyze when the log is replayed
-    private String originStmt;
+    private String originStmt = "";
 
     // include broker desc and data desc
     private PullLoadSourceInfo dataSourceInfo = new PullLoadSourceInfo();
@@ -96,7 +97,7 @@ public class BrokerLoadJob extends LoadJob {
         super(dbId, label);
         this.timeoutSecond = Config.broker_load_default_timeout_second;
         this.brokerDesc = brokerDesc;
-        this.originStmt = originStmt;
+        this.originStmt = Strings.nullToEmpty(originStmt);
         this.jobType = EtlJobType.BROKER;
         this.authorizationInfo = gatherAuthInfo();
     }
@@ -258,7 +259,7 @@ public class BrokerLoadJob extends LoadJob {
      */
     @Override
     public void analyze() {
-        if (originStmt == null) {
+        if (Strings.isNullOrEmpty(originStmt)) {
             return;
         }
         // Reset dataSourceInfo, it will be re-created in analyze
@@ -511,6 +512,8 @@ public class BrokerLoadJob extends LoadJob {
 
         if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_58) {
             originStmt = Text.readString(in);
+        } else {
+            originStmt = "";
         }
         // The origin stmt does not be analyzed in here.
         // The reason is that it will thrown MetaNotFoundException when the tableId could not be found by tableName.
diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 5ef8b4a..e615b71 100644
--- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -592,15 +592,11 @@ public class StmtExecutor {
         }
 
         long createTime = System.currentTimeMillis();
-        UUID uuid = UUID.randomUUID();
-        String label = insertStmt.getLabel();
-        if (label == null) {
-            // if label is not set, use the uuid as label
-            label = uuid.toString();
-        }
-
         Throwable throwable = null;
 
+        UUID uuid = insertStmt.getUUID();
+        String label = insertStmt.getLabel();
+
         long loadedRows = 0;
         int filteredRows = 0;
         try {
@@ -676,7 +672,7 @@ public class StmtExecutor {
                 LOG.warn("errors when abort txn", abortTxnException);
             }
 
-            if (!Config.using_old_load_usage_pattern && !insertStmt.hasLabel()) {
+            if (!Config.using_old_load_usage_pattern && !insertStmt.isUserSpecifiedLabel()) {
                 // if not using old usage pattern, or user not specify label,
                 // the exception will be thrown to user directly without a label
                 StringBuilder sb = new StringBuilder(t.getMessage());
@@ -700,7 +696,7 @@ public class StmtExecutor {
         // 2. using_old_load_usage_pattern is set to true, means a label will be returned for user to show load.
         // 3. has filtered rows. so a label should be returned for user to show
         // 4. user specify a label for insert stmt
-        if (!insertStmt.isStreaming() || Config.using_old_load_usage_pattern || filteredRows > 0 || insertStmt.hasLabel()) {
+        if (!insertStmt.isStreaming() || Config.using_old_load_usage_pattern || filteredRows > 0 || insertStmt.isUserSpecifiedLabel()) {
             try {
                 context.getCatalog().getLoadManager().recordFinishedLoadJob(
                         label,
diff --git a/fe/src/main/java/org/apache/doris/rewrite/FEFunctions.java b/fe/src/main/java/org/apache/doris/rewrite/FEFunctions.java
index e1aad18..a8b7c57 100644
--- a/fe/src/main/java/org/apache/doris/rewrite/FEFunctions.java
+++ b/fe/src/main/java/org/apache/doris/rewrite/FEFunctions.java
@@ -120,18 +120,28 @@ public class FEFunctions {
         return new IntLiteral(unixTime, Type.INT);
     }
 
+    @FEFunction(name = "unix_timestamp", argTypes = { "DATE" }, returnType = "INT")
+    public static IntLiteral unixTimestamp2(LiteralExpr arg) throws AnalysisException {
+        long unixTime = ((DateLiteral) arg).unixTimestamp(TimeUtils.getTimeZone()) / 1000;
+        // date before 1970-01-01 or after 2038-01-19 03:14:07 should return 0 for unix_timestamp() function
+        unixTime = unixTime < 0 ? 0 : unixTime;
+        unixTime = unixTime > Integer.MAX_VALUE ? 0 : unixTime;
+        return new IntLiteral(unixTime, Type.INT);
+    }
+
     @FEFunction(name = "from_unixtime", argTypes = { "INT" }, returnType = "VARCHAR")
     public static StringLiteral fromUnixTime(LiteralExpr unixTime) throws AnalysisException {
-        //if unixTime < 0, we should return null, throw a exception and let BE process
+        // if unixTime < 0, we should return null, throw a exception and let BE process
         if (unixTime.getLongValue() < 0) {
             throw new AnalysisException("unixtime should larger than zero");
         }
         DateLiteral dl = new DateLiteral(unixTime.getLongValue() * 1000, TimeUtils.getTimeZone(), Type.DATETIME);
         return new StringLiteral(dl.getStringValue());
     }
+
     @FEFunction(name = "from_unixtime", argTypes = { "INT", "VARCHAR" }, returnType = "VARCHAR")
     public static StringLiteral fromUnixTime(LiteralExpr unixTime, StringLiteral fmtLiteral) throws AnalysisException {
-        //if unixTime < 0, we should return null, throw a exception and let BE process
+        // if unixTime < 0, we should return null, throw a exception and let BE process
         if (unixTime.getLongValue() < 0) {
             throw new AnalysisException("unixtime should larger than zero");
         }
diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index ad88468..33d7c87 100644
--- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -758,7 +758,7 @@ public class GlobalTransactionMgr {
                     continue;
                 }
                 if (entry.getKey() <= endTransactionId) {
-                    LOG.info("find a running txn with txn_id={} on db: {}, less than watermark txn_id {}",
+                    LOG.debug("find a running txn with txn_id={} on db: {}, less than watermark txn_id {}",
                             entry.getKey(), dbId, endTransactionId);
                     return false;
                 }
diff --git a/fe/src/test/java/org/apache/doris/rewrite/FEFunctionsTest.java b/fe/src/test/java/org/apache/doris/rewrite/FEFunctionsTest.java
index 652d640..b54a9f0 100644
--- a/fe/src/test/java/org/apache/doris/rewrite/FEFunctionsTest.java
+++ b/fe/src/test/java/org/apache/doris/rewrite/FEFunctionsTest.java
@@ -54,6 +54,7 @@ public class FEFunctionsTest {
             timestamp = FEFunctions.unixTimestamp(new DateLiteral("2038-01-19 03:14:07", Type.DATETIME));
             // CST time zone
             Assert.assertEquals(Integer.MAX_VALUE - 8 * 3600, timestamp.getValue());
+
         } catch (AnalysisException e) {
             e.printStackTrace();
             Assert.fail();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org