You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/08/16 21:34:02 UTC

impala git commit: IMPALA-7096: restore scanner thread memory heuristics

Repository: impala
Updated Branches:
  refs/heads/master 59435fe0a -> 7ccf73690


IMPALA-7096: restore scanner thread memory heuristics

This restores some of the heuristics removed in IMPALA-4835 that can
help scans from hitting OOM conditions. The heuristics are implemented
at the query level rather than in each scan node in isolation.

Introduce a ScannerMemLimiter class that belongs to the QueryState that
tracks the amount of memory estimated to be consumed for all scanner
threads running for the query on the current backend.

Also check soft memory limits to see if scanner threads should be
started or the current scanner thread should stop.

The long-term plan is to switch to the MT scan node implementations.
When that happens this code can be removed. In the meantime this
code is imperfect but will help avoid OOM in many scenarios.

Testing:
Added regression tests for HDFS and Kudu where we previously could
run out of memory with a low mem_limit.

Manual testing:
* Ran query tests with --thread_creation_fault_injection=true for a
  bit, confirmed no crashes.
* ran single-node stress test for Kudu and Parquet for 10-20 min each.

Change-Id: Ib9907fa8c4d2b0b85f67f4f160899c1c258ad82b
Reviewed-on: http://gerrit.cloudera.org:8080/11103
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/7ccf7369
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/7ccf7369
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/7ccf7369

Branch: refs/heads/master
Commit: 7ccf7369085aa49a8fc0daf6f91d97b8a3135682
Parents: 59435fe
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Jul 13 14:28:57 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Aug 16 21:25:00 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-scan-node.cc                   |  90 ++++++++++++---
 be/src/exec/hdfs-scan-node.h                    |  14 +--
 be/src/exec/kudu-scan-node-base.h               |   9 +-
 be/src/exec/kudu-scan-node.cc                   |  53 ++++++++-
 be/src/exec/kudu-scan-node.h                    |   8 +-
 be/src/exec/scan-node.cc                        |  29 ++++-
 be/src/exec/scan-node.h                         |  25 ++--
 be/src/runtime/CMakeLists.txt                   |   1 +
 be/src/runtime/query-state.cc                   |   2 +
 be/src/runtime/query-state.h                    |  10 +-
 be/src/runtime/scanner-mem-limiter.cc           | 114 +++++++++++++++++++
 be/src/runtime/scanner-mem-limiter.h            |  74 ++++++++++++
 .../hdfs-scanner-thread-mem-scaling.test        |  35 ++++++
 .../queries/QueryTest/kudu-scan-mem-usage.test  |  16 +++
 tests/query_test/test_mem_usage_scaling.py      |  13 +++
 15 files changed, 444 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/7ccf7369/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 9db0383..7e99fea 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -27,11 +27,13 @@
 #include "runtime/descriptors.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/io/request-context.h"
-#include "runtime/runtime-filter.inline.h"
-#include "runtime/runtime-state.h"
 #include "runtime/mem-tracker.h"
-#include "runtime/row-batch.h"
+#include "runtime/query-state.h"
 #include "runtime/row-batch-queue.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-filter.inline.h"
+#include "runtime/runtime-state.h"
+#include "runtime/scanner-mem-limiter.h"
 #include "runtime/thread-resource-mgr.h"
 #include "util/debug-util.h"
 #include "util/disk-info.h"
@@ -49,6 +51,18 @@ DECLARE_bool(skip_file_runtime_filtering);
 using namespace impala;
 using namespace impala::io;
 
+// Amount of memory that we approximate a scanner thread will use not including I/O
+// buffers. The memory used does not vary considerably between file formats (just a
+// couple of MBs). This value is conservative and taken from running against the tpch
+// lineitem table. Note: this is a crude heuristic to help reduce odds of OOM until
+// we can remove the multithreaded scanners.
+DEFINE_int64_hidden(hdfs_scanner_thread_max_estimated_bytes, 32L * 1024L * 1024L,
+    "Estimated bytes of memory consumed by HDFS scanner thread.");
+
+// Estimated upper bound on the compression ratio of compressed text files. Used to
+// estimate scanner thread memory usage.
+const int COMPRESSED_TEXT_COMPRESSION_RATIO = 11;
+
 // Amount of time to block waiting for GetNext() to release scanner threads between
 // checking if a scanner thread should yield itself back to the global thread pool.
 const int SCANNER_THREAD_WAIT_TIME_MS = 20;
@@ -132,7 +146,7 @@ Status HdfsScanNode::GetNextInternal(
 Status HdfsScanNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state));
-  thread_state_.Prepare(this);
+  thread_state_.Prepare(this, EstimateScannerThreadMemConsumption());
   scanner_thread_reservations_denied_counter_ =
       ADD_COUNTER(runtime_profile(), "NumScannerThreadReservationsDenied", TUnit::UNIT);
   return Status::OK();
@@ -157,7 +171,7 @@ void HdfsScanNode::Close(RuntimeState* state) {
   if (thread_avail_cb_id_ != -1) {
     state->resource_pool()->RemoveThreadAvailableCb(thread_avail_cb_id_);
   }
-  thread_state_.Close();
+  thread_state_.Close(this);
   HdfsScanNodeBase::Close(state);
 }
 
@@ -186,6 +200,31 @@ Status HdfsScanNode::AddDiskIoRanges(const vector<ScanRange*>& ranges,
   return Status::OK();
 }
 
+int64_t HdfsScanNode::EstimateScannerThreadMemConsumption() const {
+  // Start with the minimum I/O buffer requirement.
+  int64_t est_total_bytes = resource_profile_.min_reservation;
+
+  // Next add in the other memory that we estimate the scanner thread will use,
+  // e.g. decompression buffers, tuple buffers, etc.
+  // For compressed text, we estimate this based on the file size (since the whole file
+  // will need to be decompressed at once). For all other formats, we use a constant.
+  // Note: this is crude and we could try to refine it by factoring in the number of
+  // columns, etc, but it is unclear how beneficial this would be.
+  int64_t est_non_reserved_bytes = FLAGS_hdfs_scanner_thread_max_estimated_bytes;
+  auto it = per_type_files_.find(THdfsFileFormat::TEXT);
+  if (it != per_type_files_.end()) {
+    for (HdfsFileDesc* file : it->second) {
+      if (file->file_compression != THdfsCompression::NONE) {
+        int64_t compressed_text_est_bytes =
+            file->file_length * COMPRESSED_TEXT_COMPRESSION_RATIO;
+        est_non_reserved_bytes = max(compressed_text_est_bytes, est_non_reserved_bytes);
+      }
+    }
+  }
+  est_total_bytes += est_non_reserved_bytes;
+  return est_total_bytes;
+}
+
 void HdfsScanNode::ReturnReservationFromScannerThread(const unique_lock<mutex>& lock,
     int64_t bytes) {
   DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
@@ -205,18 +244,22 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
   //  3. Don't start up if the number of ranges left is less than the number of
   //     active scanner threads.
   //  4. Don't start up if no initial ranges have been issued (see IMPALA-1722).
-  //  5. Don't start up a ScannerThread if materialized_row_batches_ is full since
+  //  5. Don't start up a ScannerThread if the row batch queue is not full since
   //     we are not scanner bound.
-  //  6. Don't start up a thread if it is an extra thread and we can't reserve another
+  //  6. Don't start up a thread if there is not enough memory available for the
+  //     estimated memory consumption (include reservation and non-reserved memory).
+  //  7. Don't start up a thread if it is an extra thread and we can't reserve another
   //     minimum reservation's worth of memory for the thread.
-  //  7. Don't start up more than maximum number of scanner threads configured.
-  //  8. Don't start up if there are no thread tokens.
+  //  8. Don't start up more than maximum number of scanner threads configured.
+  //  9. Don't start up if there are no thread tokens.
 
   // Case 4. We have not issued the initial ranges so don't start a scanner thread.
   // Issuing ranges will call this function and we'll start the scanner threads then.
   // TODO: It would be good to have a test case for that.
   if (!initial_ranges_issued_) return;
 
+  ScannerMemLimiter* scanner_mem_limiter =
+      runtime_state_->query_state()->scanner_mem_limiter();
   Status status = Status::OK();
   while (true) {
     // The lock must be given up between loops in order to give writers to done_,
@@ -227,6 +270,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
 
     const int64_t num_active_scanner_threads = thread_state_.GetNumActive();
     const bool first_thread = num_active_scanner_threads == 0;
+    const int64_t est_mem = thread_state_.estimated_per_thread_mem();
     const int64_t scanner_thread_reservation = resource_profile_.min_reservation;
     // Cases 1, 2, 3.
     if (done_ || all_ranges_started_ ||
@@ -235,21 +279,28 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
     }
 
     if (!first_thread) {
-      // Cases 5 and 6.
+      // Cases 5, 6 and 7.
       if (thread_state_.batch_queue()->AtCapacity()) break;
+      if (!scanner_mem_limiter->ClaimMemoryForScannerThread(this, est_mem)) {
+        COUNTER_ADD(thread_state_.scanner_thread_mem_unavailable_counter(), 1);
+        break;
+      }
+
       // The node's min reservation is for the first thread so we don't need to check
       if (!buffer_pool_client()->IncreaseReservation(scanner_thread_reservation)) {
+        scanner_mem_limiter->ReleaseMemoryForScannerThread(this, est_mem);
         COUNTER_ADD(scanner_thread_reservations_denied_counter_, 1);
         break;
       }
     }
 
-    // Case 7 and 8.
+    // Case 8 and 9.
     if (first_thread) {
       // The first thread is required to make progress on the scan.
       pool->AcquireThreadToken();
     } else if (thread_state_.GetNumActive() >= thread_state_.max_num_scanner_threads()
         || !pool->TryAcquireThreadToken()) {
+      scanner_mem_limiter->ReleaseMemoryForScannerThread(this, est_mem);
       ReturnReservationFromScannerThread(lock, scanner_thread_reservation);
       break;
     }
@@ -261,9 +312,12 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
       this->ScannerThread(first_thread, scanner_thread_reservation);
     };
     std::unique_ptr<Thread> t;
-    status =
-      Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn, &t, true);
+    status = Thread::Create(
+        FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn, &t, true);
     if (!status.ok()) {
+      if (!first_thread) {
+        scanner_mem_limiter->ReleaseMemoryForScannerThread(this, est_mem);
+      }
       ReturnReservationFromScannerThread(lock, scanner_thread_reservation);
       // Release the token and skip running callbacks to find a replacement. Skipping
       // serves two purposes. First, it prevents a mutual recursion between this function
@@ -352,6 +406,11 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser
       break;
     }
 
+    // Stop extra threads if we're over a soft limit in order to free up memory.
+    if (!first_thread && mem_tracker_->AnyLimitExceeded(MemLimit::SOFT)) {
+      break;
+    }
+
     // Done with range and it completed successfully
     if (progress_.done()) {
       // All ranges are finished.  Indicate we are done.
@@ -377,6 +436,11 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser
   filter_mem_pool.FreeAll();
   expr_results_pool.FreeAll();
   runtime_state_->resource_pool()->ReleaseThreadToken(first_thread);
+  if (!first_thread) {
+    // Memory for the first thread is released in thread_state_.Close().
+    runtime_state_->query_state()->scanner_mem_limiter()->ReleaseMemoryForScannerThread(
+        this, thread_state_.estimated_per_thread_mem());
+  }
   thread_state_.DecrementNumActive();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/7ccf7369/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 41d919e..bc53fab 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -15,9 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-
-#ifndef IMPALA_EXEC_HDFS_SCAN_NODE_H_
-#define IMPALA_EXEC_HDFS_SCAN_NODE_H_
+#pragma once
 
 #include <map>
 #include <memory>
@@ -135,10 +133,14 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// -1 if no callback is registered.
   int thread_avail_cb_id_ = -1;
 
-  // Number of times scanner threads were not created because of reservation increase
-  // being denied.
+  /// Number of times scanner threads were not created because of reservation increase
+  /// being denied.
   RuntimeProfile::Counter* scanner_thread_reservations_denied_counter_ = nullptr;
 
+  /// Compute the estimated memory consumption of a scanner thread in bytes for the
+  /// purposes of deciding whether to start a new scanner thread.
+  int64_t EstimateScannerThreadMemConsumption() const;
+
   /// Tries to spin up as many scanner threads as the quota allows. Called explicitly
   /// (e.g., when adding new ranges) or when threads are available for this scan node.
   void ThreadTokenAvailableCb(ThreadResourcePool* pool);
@@ -184,5 +186,3 @@ class HdfsScanNode : public HdfsScanNodeBase {
 };
 
 }
-
-#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/7ccf7369/be/src/exec/kudu-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-base.h b/be/src/exec/kudu-scan-node-base.h
index 8ec87c6..e9ac60d 100644
--- a/be/src/exec/kudu-scan-node-base.h
+++ b/be/src/exec/kudu-scan-node-base.h
@@ -15,8 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef IMPALA_EXEC_KUDU_SCAN_NODE_BASE_H_
-#define IMPALA_EXEC_KUDU_SCAN_NODE_BASE_H_
+#pragma once
 
 #include <gtest/gtest.h>
 #include <kudu/client/client.h>
@@ -56,6 +55,8 @@ class KuduScanNodeBase : public ScanNode {
   /// Not thread safe, access must be synchronized.
   const std::string* GetNextScanToken();
 
+  const TupleDescriptor* tuple_desc() const { return tuple_desc_; }
+
  private:
   friend class KuduScanner;
 
@@ -83,11 +84,7 @@ class KuduScanNodeBase : public ScanNode {
   static const std::string KUDU_ROUND_TRIPS;
   static const std::string KUDU_REMOTE_TOKENS;
 
-  const TupleDescriptor* tuple_desc() const { return tuple_desc_; }
   kudu::client::KuduClient* kudu_client() { return client_; }
   RuntimeProfile::Counter* kudu_round_trips() const { return kudu_round_trips_; }
 };
-
 }
-
-#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/7ccf7369/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 30194f9..030196a 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -25,9 +25,11 @@
 #include "gutil/gscoped_ptr.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/mem-pool.h"
-#include "runtime/runtime-state.h"
-#include "runtime/row-batch.h"
+#include "runtime/query-state.h"
 #include "runtime/row-batch-queue.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "runtime/scanner-mem-limiter.h"
 #include "runtime/thread-resource-mgr.h"
 #include "runtime/tuple-row.h"
 #include "util/runtime-profile-counters.h"
@@ -37,6 +39,18 @@
 DEFINE_int32(kudu_max_row_batches, 0, "The maximum size of the row batch queue, "
     " for Kudu scanners.");
 
+// Empirically derived estimate for the Kudu scan's memory consumption per column of
+// data materialized.
+DEFINE_int64_hidden(kudu_scanner_thread_estimated_bytes_per_column, 384L * 1024L,
+    "Estimated bytes of memory per materialized column consumed by Kudu scanner thread.");
+
+// Empirically derived estimate for the maximum consumption of Kudu scan, based on
+// experiments with 250-column table with num_scanner_threads=1, where I wasn't able
+// to coax the scan to use more than 25MB of memory.
+DEFINE_int64_hidden(kudu_scanner_thread_max_estimated_bytes, 32L * 1024L * 1024L,
+    "Estimated maximum bytes of memory consumed by Kudu scanner thread for high column "
+    "counts.");
+
 namespace impala {
 
 KuduScanNode::KuduScanNode(ObjectPool* pool, const TPlanNode& tnode,
@@ -53,7 +67,7 @@ KuduScanNode::~KuduScanNode() {
 
 Status KuduScanNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(KuduScanNodeBase::Prepare(state));
-  thread_state_.Prepare(this);
+  thread_state_.Prepare(this, EstimateScannerThreadMemConsumption());
   return Status::OK();
 }
 
@@ -119,22 +133,44 @@ void KuduScanNode::Close(RuntimeState* state) {
 
   SetDone();
 
-  thread_state_.Close();
+  thread_state_.Close(this);
   KuduScanNodeBase::Close(state);
 }
 
+int64_t KuduScanNode::EstimateScannerThreadMemConsumption() {
+  int64_t num_cols = max<int64_t>(1, tuple_desc()->slots().size());
+  return min(FLAGS_kudu_scanner_thread_max_estimated_bytes,
+      FLAGS_kudu_scanner_thread_estimated_bytes_per_column * num_cols);
+}
+
 void KuduScanNode::ThreadAvailableCb(ThreadResourcePool* pool) {
+  ScannerMemLimiter* mem_limiter = runtime_state_->query_state()->scanner_mem_limiter();
   while (true) {
     unique_lock<mutex> lock(lock_);
     // All done or all tokens are assigned.
     if (done_ || !HasScanToken()) break;
     bool first_thread = thread_state_.GetNumActive() == 0;
 
+    // * Don't start up a ScannerThread if the row batch queue is full since
+    //    we are not scanner bound.
+    // * Don't start up a thread if there is not enough memory available for the
+    //    estimated memory consumption (include reservation and non-reserved memory).
+    if (!first_thread) {
+      if (thread_state_.batch_queue()->AtCapacity()) break;
+      if (!mem_limiter->ClaimMemoryForScannerThread(
+              this, EstimateScannerThreadMemConsumption())) {
+        COUNTER_ADD(thread_state_.scanner_thread_mem_unavailable_counter(), 1);
+        break;
+      }
+    }
+
     // Check if we can get a token. We need at least one thread to run.
     if (first_thread) {
       pool->AcquireThreadToken();
     } else if (thread_state_.GetNumActive() >= thread_state_.max_num_scanner_threads()
         || !pool->TryAcquireThreadToken()) {
+      mem_limiter->ReleaseMemoryForScannerThread(
+          this, EstimateScannerThreadMemConsumption());
       break;
     }
 
@@ -157,6 +193,10 @@ void KuduScanNode::ThreadAvailableCb(ThreadResourcePool* pool) {
       // and ReleaseThreadToken()->InvokeCallbacks(). Second, Thread::Create() failed and
       // is likely to continue failing for future callbacks.
       pool->ReleaseThreadToken(first_thread, true);
+      if (!first_thread) {
+        mem_limiter->ReleaseMemoryForScannerThread(
+            this, EstimateScannerThreadMemConsumption());
+      }
 
       // Abort the query. This is still holding the lock_, so done_ is known to be
       // false and status_ must be ok.
@@ -235,6 +275,11 @@ void KuduScanNode::RunScannerThread(
   // lock_ is released before calling ThreadResourceMgr::ReleaseThreadToken() which
   // invokes ThreadAvailableCb() which attempts to take the same lock.
   VLOG_RPC << "Thread done: " << name;
+  if (!first_thread) {
+    ScannerMemLimiter* mem_limiter = runtime_state_->query_state()->scanner_mem_limiter();
+    mem_limiter->ReleaseMemoryForScannerThread(
+        this, EstimateScannerThreadMemConsumption());
+  }
   runtime_state_->resource_pool()->ReleaseThreadToken(first_thread);
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/7ccf7369/be/src/exec/kudu-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.h b/be/src/exec/kudu-scan-node.h
index 3b5ca75..a04c162 100644
--- a/be/src/exec/kudu-scan-node.h
+++ b/be/src/exec/kudu-scan-node.h
@@ -15,8 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef IMPALA_EXEC_KUDU_SCAN_NODE_H_
-#define IMPALA_EXEC_KUDU_SCAN_NODE_H_
+#pragma once
 
 #include <boost/scoped_ptr.hpp>
 #include <gtest/gtest.h>
@@ -74,6 +73,9 @@ class KuduScanNode : public KuduScanNodeBase {
   /// -1 if no callback is registered.
   int thread_avail_cb_id_;
 
+  /// Compute the estimated memory consumption of each Kudu scanner thread.
+  int64_t EstimateScannerThreadMemConsumption();
+
   /// Called when scanner threads are available for this scan node. This will
   /// try to spin up as many scanner threads as the quota allows.
   void ThreadAvailableCb(ThreadResourcePool* pool);
@@ -104,5 +106,3 @@ class KuduScanNode : public KuduScanNodeBase {
 };
 
 }
-
-#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/7ccf7369/be/src/exec/scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 85e1953..c764b5a 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -21,10 +21,12 @@
 
 #include "exprs/scalar-expr.h"
 #include "runtime/io/disk-io-mgr.h"
-#include "runtime/row-batch.h"
+#include "runtime/query-state.h"
 #include "runtime/row-batch-queue.h"
+#include "runtime/row-batch.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
+#include "runtime/scanner-mem-limiter.h"
 #include "util/disk-info.h"
 #include "util/runtime-profile-counters.h"
 
@@ -192,10 +194,12 @@ bool ScanNode::WaitForRuntimeFilters() {
   return false;
 }
 
-void ScanNode::ScannerThreadState::Prepare(ScanNode* parent) {
+void ScanNode::ScannerThreadState::Prepare(
+    ScanNode* parent, int64_t estimated_per_thread_mem) {
+  DCHECK_GT(estimated_per_thread_mem, 0);
   RuntimeProfile* profile = parent->runtime_profile();
-  row_batches_mem_tracker_ = parent->runtime_state_->obj_pool()->Add(new MemTracker(
-        -1, "Queued Batches", parent->mem_tracker(), false));
+  row_batches_mem_tracker_ = parent->runtime_state_->obj_pool()->Add(
+      new MemTracker(-1, "Queued Batches", parent->mem_tracker(), false));
 
   thread_counters_ = ADD_THREAD_COUNTERS(profile, SCANNER_THREAD_COUNTERS_PREFIX);
   num_threads_started_ = ADD_COUNTER(profile, NUM_SCANNER_THREADS_STARTED, TUnit::UNIT);
@@ -207,6 +211,12 @@ void ScanNode::ScannerThreadState::Prepare(ScanNode* parent) {
   row_batches_put_timer_ = ADD_TIMER(profile, "RowBatchQueuePutWaitTime");
   row_batches_peak_mem_consumption_ =
       ADD_COUNTER(profile, "RowBatchQueuePeakMemoryUsage", TUnit::BYTES);
+  scanner_thread_mem_unavailable_counter_ =
+      ADD_COUNTER(profile, "NumScannerThreadMemUnavailable", TUnit::UNIT);
+
+  parent->runtime_state()->query_state()->scanner_mem_limiter()->RegisterScan(
+      parent, estimated_per_thread_mem);
+  estimated_per_thread_mem_ = estimated_per_thread_mem;
 }
 
 void ScanNode::ScannerThreadState::Open(
@@ -297,7 +307,7 @@ void ScanNode::ScannerThreadState::Shutdown() {
   if (batch_queue_ != nullptr) batch_queue_->Shutdown();
 }
 
-void ScanNode::ScannerThreadState::Close() {
+void ScanNode::ScannerThreadState::Close(ScanNode* parent) {
   scanner_threads_.JoinAll();
   DCHECK_EQ(num_active_.Load(), 0) << "There should be no active threads";
   if (batch_queue_ != nullptr) {
@@ -306,6 +316,13 @@ void ScanNode::ScannerThreadState::Close() {
     row_batches_peak_mem_consumption_->Set(row_batches_mem_tracker_->peak_consumption());
     batch_queue_->Cleanup();
   }
-  if (row_batches_mem_tracker_ != nullptr) row_batches_mem_tracker_->Close();
+  if (row_batches_mem_tracker_ != nullptr) {
+    row_batches_mem_tracker_->Close();
+  }
+  if (estimated_per_thread_mem_ != 0) {
+    parent->runtime_state()->query_state()->scanner_mem_limiter()
+        ->ReleaseMemoryForScannerThread(parent, estimated_per_thread_mem_);
+    estimated_per_thread_mem_ = 0;
+  }
 }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/7ccf7369/be/src/exec/scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 9f47c45..68a3fad 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -15,9 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-
-#ifndef IMPALA_EXEC_SCAN_NODE_H_
-#define IMPALA_EXEC_SCAN_NODE_H_
+#pragma once
 
 #include <string>
 #include "exec/exec-node.h"
@@ -210,7 +208,10 @@ class ScanNode : public ExecNode {
   class ScannerThreadState {
    public:
     /// Called from *ScanNode::Prepare() to initialize counters and MemTracker.
-    void Prepare(ScanNode* parent);
+    /// 'estimated_per_thread_mem' is the estimated memory consumption of each scanner
+    /// thread and must be positive. Prepare() registers the scan with the query-global
+    /// ScannerMemLimit and accounts for the first thread's memory consumption.
+    void Prepare(ScanNode* parent, int64_t estimated_per_thread_mem);
 
     /// Called from *ScanNode::Open() to create the row batch queue and start periodic
     /// counters running. 'max_row_batches_override' determines size of the row batch
@@ -223,7 +224,7 @@ class ScanNode : public ExecNode {
 
     /// Waits for all scanner threads to finish and cleans up the queue. Called from
     /// *ScanNode::Close(). No other methods can be called after this. Not thread-safe.
-    void Close();
+    void Close(ScanNode* parent);
 
     /// Add a new scanner thread to the thread group. Not thread-safe: only one thread
     /// should call AddThread() at a time.
@@ -255,6 +256,10 @@ class ScanNode : public ExecNode {
     RowBatchQueue* batch_queue() { return batch_queue_.get(); }
     RuntimeProfile::ThreadCounters* thread_counters() const { return thread_counters_; }
     int max_num_scanner_threads() const { return max_num_scanner_threads_; }
+    int64_t estimated_per_thread_mem() const { return estimated_per_thread_mem_; }
+    RuntimeProfile::Counter* scanner_thread_mem_unavailable_counter() const {
+      return scanner_thread_mem_unavailable_counter_;
+    }
 
    private:
     /// Thread group for all scanner threads.
@@ -266,6 +271,11 @@ class ScanNode : public ExecNode {
     /// the number of cores. Set in Open().
     int max_num_scanner_threads_ = 0;
 
+    /// Estimated amount of memory that each additional scanner thread will consume. Used
+    /// to decide whether enough memory is available to create a new scanner thread. Set
+    /// to 0 when the memory for the first thread claimed in Prepare() is released.
+    int64_t estimated_per_thread_mem_ = 0;
+
     // MemTracker for queued row batches. Initialized in Prepare(). Owned by RuntimeState.
     MemTracker* row_batches_mem_tracker_ = nullptr;
 
@@ -305,8 +315,9 @@ class ScanNode : public ExecNode {
 
     /// Peak memory consumption of the materialized batch queue. Updated in Close().
     RuntimeProfile::Counter* row_batches_peak_mem_consumption_ = nullptr;
+
+    /// Number of times scanner threads were not created because of memory not available.
+    RuntimeProfile::Counter* scanner_thread_mem_unavailable_counter_ = nullptr;
   };
 };
 }
-
-#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/7ccf7369/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index f67b8fe..f4049a0 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -63,6 +63,7 @@ add_library(Runtime
   runtime-filter-bank.cc
   runtime-filter-ir.cc
   runtime-state.cc
+  scanner-mem-limiter.cc
   sorted-run-merger.cc
   sorter.cc
   string-value.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/7ccf7369/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 2ae4a27..4b8924a 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -32,6 +32,7 @@
 #include "runtime/mem-tracker.h"
 #include "runtime/query-exec-mgr.h"
 #include "runtime/runtime-state.h"
+#include "runtime/scanner-mem-limiter.h"
 #include "util/debug-util.h"
 #include "util/impalad-metrics.h"
 #include "util/thread.h"
@@ -150,6 +151,7 @@ Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) {
       rpc_params.initial_mem_reservation_total_claims));
   RETURN_IF_ERROR(
       initial_reservations_->Init(query_id(), rpc_params.min_mem_reservation_bytes));
+  scanner_mem_limiter_ = obj_pool_.Add(new ScannerMemLimiter);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/7ccf7369/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 607d82a..49fd8eb 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -39,6 +39,7 @@ class InitialReservations;
 class MemTracker;
 class ReservationTracker;
 class RuntimeState;
+class ScannerMemLimiter;
 
 /// Central class for all backend execution state (example: the FragmentInstanceStates
 /// of the individual fragment instances) created for a particular query.
@@ -127,6 +128,7 @@ class QueryState {
   // the following getters are only valid after Init()
   ReservationTracker* buffer_reservation() const { return buffer_reservation_; }
   InitialReservations* initial_reservations() const { return initial_reservations_; }
+  ScannerMemLimiter* scanner_mem_limiter() const { return scanner_mem_limiter_; }
   TmpFileMgr::FileGroup* file_group() const { return file_group_; }
   const TExecQueryFInstancesParams& rpc_params() const { return rpc_params_; }
 
@@ -302,14 +304,18 @@ class QueryState {
   /// TODO: find a way not to have to copy this
   TExecQueryFInstancesParams rpc_params_;
 
-  /// Buffer reservation for this query (owned by obj_pool_). Set in Prepare().
+  /// Buffer reservation for this query (owned by obj_pool_). Set in Init().
   ReservationTracker* buffer_reservation_ = nullptr;
 
   /// Pool of buffer reservations used to distribute initial reservations to operators
   /// in the query. Contains a ReservationTracker that is a child of
-  /// 'buffer_reservation_'. Owned by 'obj_pool_'. Set in Prepare().
+  /// 'buffer_reservation_'. Owned by 'obj_pool_'. Set in Init().
   InitialReservations* initial_reservations_ = nullptr;
 
+  /// Tracks expected memory consumption of all multithreaded scans for this query on
+  /// this daemon. Owned by 'obj_pool_'. Set in Init().
+  ScannerMemLimiter* scanner_mem_limiter_ = nullptr;
+
   /// Number of active fragment instances and coordinators for this query that may consume
   /// resources for query execution (i.e. threads, memory) on the Impala daemon.
   /// Query-wide execution resources for this query are released once this goes to zero.

http://git-wip-us.apache.org/repos/asf/impala/blob/7ccf7369/be/src/runtime/scanner-mem-limiter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/scanner-mem-limiter.cc b/be/src/runtime/scanner-mem-limiter.cc
new file mode 100644
index 0000000..99976d2
--- /dev/null
+++ b/be/src/runtime/scanner-mem-limiter.cc
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/scanner-mem-limiter.h"
+
+#include <boost/thread/locks.hpp>
+
+#include "exec/scan-node.h"
+#include "runtime/mem-tracker.h"
+
+#include "common/names.h"
+
+namespace impala {
+struct ScannerMemLimiter::RegisteredScan {
+  RegisteredScan(int64_t estimated_initial_thread_mem)
+    : estimated_mem(estimated_initial_thread_mem), num_threads(1) {}
+
+  /// The estimated amount of memory to run all threads that have already been started.
+  /// Updated by ClaimMemoryForScannerThread().
+  AtomicInt64 estimated_mem;
+
+  /// The number of threads active in the scan node. Updated by
+  /// ClaimMemoryForScannerThread().
+  AtomicInt64 num_threads;
+};
+
+ScannerMemLimiter::ScannerMemLimiter() {}
+
+ScannerMemLimiter::~ScannerMemLimiter() {
+  for (const auto& element : registered_scans_) {
+    const unique_ptr<RegisteredScan>& scan = element.second;
+    DCHECK_EQ(0, scan->estimated_mem.Load());
+    DCHECK_EQ(0, scan->num_threads.Load());
+  }
+}
+
+void ScannerMemLimiter::RegisterScan(
+    ScanNode* node, int64_t estimated_initial_thread_mem) {
+  unique_ptr<RegisteredScan> scan(new RegisteredScan(estimated_initial_thread_mem));
+  lock_guard<shared_mutex> write_lock(registered_scans_lock_);
+  bool added = registered_scans_.emplace(node, move(scan)).second;
+  DCHECK(added) << node->DebugString();
+}
+
+bool ScannerMemLimiter::ClaimMemoryForScannerThread(
+    ScanNode* node, int64_t estimated_thread_mem) {
+  shared_lock<shared_mutex> read_lock(registered_scans_lock_);
+  RegisteredScan* found_scan = nullptr;
+  // Calculate the memory consumption in excess of the current consumption that we expect
+  // from already-started threads plus the new thread. We need to compute the global
+  // total across all scans because multiple scans can compete for the same memory.
+  int64_t addtl_consumption = 0;
+  for (const auto& element : registered_scans_) {
+    const unique_ptr<RegisteredScan>& scan = element.second;
+    int64_t consumption = element.first->mem_tracker()->consumption();
+    int64_t num_threads = scan->num_threads.Load();
+    int64_t estimated_mem = scan->estimated_mem.Load();
+    if (consumption > estimated_mem) {
+      // Memory exceeded our estimate. Use a crude heuristic of guessing that the scan
+      // will use up to 50% more memory. This is carried over from old versions of the
+      // code pre-IMPALA-4835, which were initially added in the commit titled
+      // "Dynamically scale down mem usage in scanners and io mgr."
+      if (node == element.first) {
+        // Add consumption for the new thread.
+        addtl_consumption += static_cast<int64_t>((consumption * 1.5) / num_threads);
+      }
+      // We guess that consumption of existing threads will grow up to 50% above the
+      // current consumption.
+      addtl_consumption += static_cast<int64_t>(consumption * 0.5);
+    } else {
+      // The scan hasn't used all the estimated memory yet - make sure that that is
+      // accounted for.
+      addtl_consumption += estimated_mem - consumption;
+      if (node == element.first) addtl_consumption += estimated_thread_mem;
+    }
+    if (node == element.first) found_scan = scan.get();
+  }
+  DCHECK(found_scan != nullptr) << "Increase mem on unregistered scan";
+  // Check if we have capacity for the expected increase in consumption.
+  if (addtl_consumption >= node->mem_tracker()->SpareCapacity(MemLimit::SOFT)) {
+    return false;
+  }
+  // There is enough memory - update the estimated memory with the estimate.
+  found_scan->estimated_mem.Add(estimated_thread_mem);
+  found_scan->num_threads.Add(1);
+  return true;
+}
+
+void ScannerMemLimiter::ReleaseMemoryForScannerThread(
+    ScanNode* node, int64_t estimated_thread_mem) {
+  shared_lock<shared_mutex> read_lock(registered_scans_lock_);
+  auto it = registered_scans_.find(node);
+  DCHECK(it != registered_scans_.end()) << node->id() << " not found.";
+  RegisteredScan* scan = it->second.get();
+  int64_t mem = scan->estimated_mem.Add(-estimated_thread_mem);
+  DCHECK_GE(mem, 0);
+  int64_t num_threads = scan->num_threads.Add(-1);
+  DCHECK_GE(num_threads, 0);
+}
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/7ccf7369/be/src/runtime/scanner-mem-limiter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/scanner-mem-limiter.h b/be/src/runtime/scanner-mem-limiter.h
new file mode 100644
index 0000000..78768fa
--- /dev/null
+++ b/be/src/runtime/scanner-mem-limiter.h
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <unordered_map>
+#include <vector>
+
+#include <boost/thread/shared_mutex.hpp>
+
+namespace impala {
+class MemTracker;
+class ScanNode;
+
+/// Class to keep track of scanner threads for a query on the current backend.
+/// Tracks the number of threads and their expected memory consumption
+/// for the purpose of limiting the aggregate memory consumption of scanner threads.
+/// This allows us to implement some heuristics that help prevent us going over the
+/// query memory limit by creating too many scanner threads.
+/// TODO: this can be removed once all scan nodes are switched to the MT versions.
+class ScannerMemLimiter {
+ public:
+  ScannerMemLimiter();
+  ~ScannerMemLimiter();
+
+  /// Register a scan and track the estimated memory 'estimated_initial_thread_mem'
+  /// for the initial threads. 'node' must live as long as this object is in use
+  /// (i.e. as long as the below methods are being called). Each 'node' can only
+  /// be registered once.
+  void RegisterScan(ScanNode* node, int64_t estimated_initial_thread_mem);
+
+  /// Returns true if there is enough memory available to create a scanner thread
+  /// for node 'node_id' that is estimated to consume 'estimated_thread_mem' bytes
+  /// of memory. Updates the state of the limiter to reflect the increased memory.
+  /// The caller must call ReleaseMemoryForScannerThread() for every successful
+  /// call to this function.
+  bool ClaimMemoryForScannerThread(ScanNode* node, int64_t estimated_thread_mem);
+
+  /// Must be called when a scanner thread exits. Releases the memory accounted for
+  /// by ClaimMemoryForScannerThread().
+  void ReleaseMemoryForScannerThread(ScanNode* node, int64_t estimated_thread_mem);
+
+ private:
+  struct RegisteredScan;
+
+  /// Protects below data structures.
+  boost::shared_mutex registered_scans_lock_;
+
+  /// All of this query's scan nodes. The scan nodes register themselves in their
+  /// Prepare() phase. Threads must hold 'registered_scans_lock_' in shared mode
+  /// when reading or in exclusive mode when modifying. The total memory currently
+  /// consumed by the scans is calculated by iterating over this map, which is
+  /// O(n) in the number of scans. With an unlimited number of thread tokens,
+  /// this would mean we do O(n^2) work per query, but in practice if there are a large
+  /// number of concurrent scans there will be no available thread tokens and
+  /// ClaimMemoryForScannerThread() will not be called.
+  std::unordered_map<ScanNode*, std::unique_ptr<RegisteredScan>> registered_scans_;
+};
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/7ccf7369/testdata/workloads/functional-query/queries/QueryTest/hdfs-scanner-thread-mem-scaling.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/hdfs-scanner-thread-mem-scaling.test b/testdata/workloads/functional-query/queries/QueryTest/hdfs-scanner-thread-mem-scaling.test
new file mode 100644
index 0000000..ff3e2fc
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/hdfs-scanner-thread-mem-scaling.test
@@ -0,0 +1,35 @@
+====
+---- QUERY
+# IMPALA-7076: this query will fail if the HDFS scanner scan spins up too many scanner
+# threads, because the threads need to decompress full input files into memory.
+# Only one scanner thread per impalad should be started.
+set num_nodes=1;
+set mem_limit=75m;
+select l_orderkey, l_shipmode from tpch_text_gzip.lineitem
+where l_comment = 'telets. quickly ';
+---- TYPES
+BIGINT,STRING
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+49824,'RAIL'
+1380737,'AIR'
+2981252,'TRUCK'
+3415170,'MAIL'
+---- RUNTIME_PROFILE
+aggregation(SUM, NumScannerThreadsStarted): 1
+====
+---- QUERY
+# IMPALA-7076: check that the Parquet scanner also limits memory consumption.
+set num_nodes=1;
+set mem_limit=50m;
+select l_orderkey, l_shipmode from tpch_parquet.lineitem
+where l_comment = 'telets. quickly ';
+---- TYPES
+BIGINT,STRING
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+49824,'RAIL'
+1380737,'AIR'
+2981252,'TRUCK'
+3415170,'MAIL'
+---- RUNTIME_PROFILE
+aggregation(SUM, NumScannerThreadsStarted): 1
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/7ccf7369/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-mem-usage.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-mem-usage.test b/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-mem-usage.test
new file mode 100644
index 0000000..aa42edb
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-mem-usage.test
@@ -0,0 +1,16 @@
+====
+---- QUERY
+# IMPALA-7076: this query will fail if the Kudu scan spins up too many scanner threads.
+# Only one scanner thread per impalad should be started.
+set mem_limit=4mb;
+select * from tpch_kudu.orders
+order by o_orderkey limit 3;
+---- TYPES
+BIGINT,BIGINT,STRING,DECIMAL,STRING,STRING,STRING,INT,STRING
+---- RESULTS
+1,36901,'O',173665.47,'1996-01-02','5-LOW','Clerk#000000951',0,'nstructions sleep furiously among '
+2,78002,'O',46929.18,'1996-12-01','1-URGENT','Clerk#000000880',0,' foxes. pending accounts at the pending, silent asymptot'
+3,123314,'F',193846.25,'1993-10-14','5-LOW','Clerk#000000955',0,'sly final accounts boost. carefully regular ideas cajole carefully. depos'
+---- RUNTIME_PROFILE
+aggregation(SUM, NumScannerThreadsStarted): 3
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/7ccf7369/tests/query_test/test_mem_usage_scaling.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_mem_usage_scaling.py b/tests/query_test/test_mem_usage_scaling.py
index 6535feb..0d3219a 100644
--- a/tests/query_test/test_mem_usage_scaling.py
+++ b/tests/query_test/test_mem_usage_scaling.py
@@ -349,3 +349,16 @@ class TestScanMemLimit(ImpalaTestSuite):
         "refresh {0}.{1}".format(unique_database, TBL))
 
     self.execute_query_expect_success(self.client, SELECT_QUERY, SELECT_OPTIONS)
+
+  def test_kudu_scan_mem_usage(self, vector):
+    """Test that Kudu scans can stay within a low memory limit. Before IMPALA-7096 they
+    were not aware of mem_limit and would start up too many scanner threads."""
+    self.run_test_case('QueryTest/kudu-scan-mem-usage', vector)
+
+  def test_hdfs_scanner_thread_mem_scaling(self, vector):
+    """Test that HDFS scans can stay within a low memory limit. Before IMPALA-7096 they
+    were not aware of non-reserved memory consumption and could start up too many scanner
+    threads."""
+    # Remove num_nodes setting to allow .test file to set num_nodes.
+    del vector.get_value('exec_option')['num_nodes']
+    self.run_test_case('QueryTest/hdfs-scanner-thread-mem-scaling', vector)