You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/02/25 02:10:18 UTC

[impala] branch master updated (b9593d5 -> 285cea9)

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from b9593d5  IMPALA-9767: Do not clean up filter while PublishFilter is ongoing
     new 8059af8  IMPALA-10371: test_java_udfs crash impalad if result spooling is enabled
     new c8d94e6  IMPALA-10504: Add tracing for remote block reads
     new 20d4df3  IMPALA-10496: Bump springframework dependency to 4.3.29
     new f13170e  IMPALA-10527: Fix DiskIoMgrTest.WriteToRemotePartialFileSuccess failed in tsan build
     new 285cea9  IMPALA-10526: Fix BufferPoolTest.Multi8RandomSpillToRemoteMix failed in sanitizer builds

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exprs/hive-udf-call-ir.cc       |  2 +-
 be/src/exprs/hive-udf-call.cc          | 20 ++++++++---------
 be/src/exprs/hive-udf-call.h           | 12 ++++++++++-
 be/src/runtime/io/hdfs-file-reader.cc  | 39 ++++++++++++++++++++++++++++++++++
 be/src/runtime/io/hdfs-file-reader.h   |  5 +++++
 be/src/runtime/io/request-ranges.h     |  6 ++++--
 be/src/runtime/io/scan-range.cc        | 31 +++++++++++++++++----------
 be/src/runtime/tmp-file-mgr-internal.h |  3 +++
 be/src/runtime/tmp-file-mgr.cc         | 23 ++++++++++++++------
 fe/pom.xml                             |  9 ++++++++
 java/pom.xml                           |  3 ++-
 11 files changed, 120 insertions(+), 33 deletions(-)


[impala] 03/05: IMPALA-10496: Bump springframework dependency to 4.3.29

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 20d4df365196a04d061a1d19520580847de952e5
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Tue Feb 23 20:38:06 2021 +0100

    IMPALA-10496: Bump springframework dependency to 4.3.29
    
    Impala depended on springframework 4.3.19 through pac4j
    since IMPALA-10496.
    
    Testing:
    - used dependency-check-maven plugin to check that the CVEs
      related to springframework disappear
    
    Change-Id: I81a2b00a0dd1b1560fa97a13ccf4cf6bb69b4b51
    Reviewed-on: http://gerrit.cloudera.org:8080/17112
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 fe/pom.xml   | 9 +++++++++
 java/pom.xml | 3 ++-
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git a/fe/pom.xml b/fe/pom.xml
index 75fcf19..70579e3 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -531,6 +531,10 @@ under the License.
           <groupId>xalan</groupId>
           <artifactId>xalan</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.springframework</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -543,6 +547,11 @@ under the License.
       <artifactId>xmlsec</artifactId>
       <version>${xmlsec.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-core</artifactId>
+      <version>${springframework.version}</version>
+    </dependency>
   </dependencies>
 
   <reporting>
diff --git a/java/pom.xml b/java/pom.xml
index 32d4bdf..bc936fd 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -65,10 +65,11 @@ under the License.
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <iceberg.version>${env.IMPALA_ICEBERG_VERSION}</iceberg.version>
     <pac4j.version>4.0.3</pac4j.version>
-    <!-- xmlsec and bcprov-jdk15on are not used by Impala directly,
+    <!-- xmlsec, bcprov-jdk15on and springframework are not used by Impala directly,
          but needed to replace pac4j 4.0.3's unsafe versions -->
     <xmlsec.version>2.2.1</xmlsec.version>
     <bcprov-jdk15on.version>1.64</bcprov-jdk15on.version>
+    <springframework.version>4.3.29.RELEASE</springframework.version>
   </properties>
 
   <repositories>


[impala] 01/05: IMPALA-10371: test_java_udfs crash impalad if result spooling is enabled

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8059af8cee3563b42ffe04179add49c15c8067a8
Author: Daniel Becker <da...@cloudera.com>
AuthorDate: Wed Feb 10 11:33:10 2021 +0100

    IMPALA-10371: test_java_udfs crash impalad if result spooling is enabled
    
    IMPALA-7658 introduced proper codegen for HiveUdfCall. Because of a bug
    in LLVM (see https://bugs.llvm.org/show_bug.cgi?id=21431), codegen code
    could not use JniUtil::GetJNIEnv directly as it involves thread-local
    variables, which LLVM JIT does not (yet) support. The original solution
    to get around this problem was to cache the JNIEnv pointer in the
    FunctionContext but it turned out that this leads to a crash if result
    spooling is enabled because multiple threads can end up using the same
    JNIEnv object.
    
    This commit fixes the problem by using a different solution: instead of
    caching the JNIEnv pointer we use a wrapper function
    (HiveUdfCall:GetJniEnvNotInlined) that prevents JniUtil::GetJNIEnv from
    being inlined in codegen code, thereby ensuring that the handling of the
    thread-local variable is compiled by GCC.
    
    Testing:
      Manually verified that TestUdfExecution::test_java_udfs passes with
      SPOOL_QUERY_RESULTS enabled.
    
    Change-Id: Ie3aadb8ccc0f1f9b7b87a5744c22a0555b325ee6
    Reviewed-on: http://gerrit.cloudera.org:8080/17106
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exprs/hive-udf-call-ir.cc |  2 +-
 be/src/exprs/hive-udf-call.cc    | 20 ++++++++++----------
 be/src/exprs/hive-udf-call.h     | 12 +++++++++++-
 3 files changed, 22 insertions(+), 12 deletions(-)

diff --git a/be/src/exprs/hive-udf-call-ir.cc b/be/src/exprs/hive-udf-call-ir.cc
index 4b69233..b4e88b7 100644
--- a/be/src/exprs/hive-udf-call-ir.cc
+++ b/be/src/exprs/hive-udf-call-ir.cc
@@ -51,7 +51,7 @@ uint8_t* HiveUdfCall::JniContext::GetInputValuesBufferAtOffset(
 AnyVal* HiveUdfCall::CallJavaAndStoreResult(const ColumnType* type,
     FunctionContext* fn_ctx, JniContext* jni_ctx) {
   DCHECK(jni_ctx != nullptr);
-  JNIEnv* env = jni_ctx->jni_env;
+  JNIEnv* env = GetJniEnvNotInlined();
   DCHECK(env != nullptr);
 
   // Using this version of Call has the lowest overhead. This eliminates the
diff --git a/be/src/exprs/hive-udf-call.cc b/be/src/exprs/hive-udf-call.cc
index b903891..a200d37 100644
--- a/be/src/exprs/hive-udf-call.cc
+++ b/be/src/exprs/hive-udf-call.cc
@@ -62,7 +62,7 @@ AnyVal* HiveUdfCall::Evaluate(ScalarExprEvaluator* eval, const TupleRow* row) co
       fn_ctx->GetFunctionState(FunctionContext::THREAD_LOCAL));
   DCHECK(jni_ctx != nullptr);
 
-  JNIEnv* env = jni_ctx->jni_env;
+  JNIEnv* env = JniUtil::GetJNIEnv();
   DCHECK(env != nullptr);
 
   // Evaluate all the children values and put the results in input_values_buffer
@@ -177,11 +177,6 @@ Status HiveUdfCall::OpenEvaluator(FunctionContext::FunctionStateScope scope,
   JNIEnv* env = JniUtil::GetJNIEnv();
   if (env == nullptr) return Status("Failed to get/create JVM");
 
-  // Cache the JNI env pointer here so it can be retrieved by codegen at execution time.
-  // LLVM can't handle thread-local variables (yet) so we cannot call JniUtil::GetJNIEnv()
-  // from codegen code.
-  jni_ctx->jni_env = env;
-
   // Fields used for error reporting.
   // This object and thus fn_ are alive when rows are evaluated.
   jni_ctx->hdfs_location = fn_.hdfs_location.c_str();
@@ -331,7 +326,7 @@ llvm::Value* CastPtrAndLoad(LlvmCodeGen* codegen, LlvmBuilder* builder,
 /// }
 ///
 /// define { i64, i8* } @HiveUdfCall(%"class.impala::ScalarExprEvaluator"* %eval,
-///                                  %"class.impala::TupleRow"* %row) #46 {
+///                                  %"class.impala::TupleRow"* %row) #49 {
 /// entry:
 ///   %0 = alloca %"struct.impala::ColumnType"
 ///   %1 = alloca %"struct.impala::StringValue"
@@ -373,7 +368,7 @@ llvm::Value* CastPtrAndLoad(LlvmCodeGen* codegen, LlvmBuilder* builder,
 ///   br label %eval_child1
 ///
 /// eval_child1:                                 ; preds = %child_not_null, %eval_child
-///   %child4 = call { i64, i8* } @"impala::CastFunctions::CastToStringValWrapper"(
+///   %child4 = call { i64, i8* } @GetSlotRef.5(
 ///       %"class.impala::ScalarExprEvaluator"* %eval, %"class.impala::TupleRow"* %row)
 ///   %11 = extractvalue { i64, i8* } %child4, 0
 ///   %child_is_null6 = trunc i64 %11 to i1
@@ -400,7 +395,7 @@ llvm::Value* CastPtrAndLoad(LlvmCodeGen* codegen, LlvmBuilder* builder,
 ///   br label %eval_child3
 ///
 /// eval_child3:                                 ; preds = %child_not_null5, %eval_child1
-///   %child11 = call { i64, i8* } @"impala::CastFunctions::CastToStringVal.5Wrapper"(
+///   %child11 = call { i64, i8* } @GetSlotRef.6(
 ///       %"class.impala::ScalarExprEvaluator"* %eval, %"class.impala::TupleRow"* %row)
 ///   %18 = extractvalue { i64, i8* } %child11, 0
 ///   %child_is_null13 = trunc i64 %18 to i1
@@ -430,7 +425,8 @@ llvm::Value* CastPtrAndLoad(LlvmCodeGen* codegen, LlvmBuilder* builder,
 ///   store %"struct.impala::ColumnType" {
 ///       i32 10, i32 -1, i32 -1, i32 -1,
 ///       %"class.std::vector.13" zeroinitializer,
-///       %"class.std::vector.18" zeroinitializer },
+///       %"class.std::vector.18" zeroinitializer,
+///       %"class.std::vector.23" zeroinitializer },
 ///       %"struct.impala::ColumnType"* %0
 ///   %ret_ptr = call %"struct.impala_udf::AnyVal"*
 ///   ; The next two lines should be one line but the name of the identifier is too long.
@@ -578,4 +574,8 @@ DateVal HiveUdfCall::GetDateValInterpreted(
   return *reinterpret_cast<DateVal*>(Evaluate(eval, row));
 }
 
+JNIEnv* HiveUdfCall::GetJniEnvNotInlined() {
+  return JniUtil::GetJNIEnv();
+}
+
 }
diff --git a/be/src/exprs/hive-udf-call.h b/be/src/exprs/hive-udf-call.h
index 186857b..e15c539 100644
--- a/be/src/exprs/hive-udf-call.h
+++ b/be/src/exprs/hive-udf-call.h
@@ -129,7 +129,6 @@ class HiveUdfCall : public ScalarExpr {
   static jmethodID executor_close_id_;
 
   struct JniContext {
-    JNIEnv* jni_env = nullptr;
     jobject executor = nullptr;
 
     uint8_t* input_values_buffer = nullptr;
@@ -160,6 +159,17 @@ class HiveUdfCall : public ScalarExpr {
   static JNIEnv* GetJniEnv(JniContext* jni_ctx);
   static AnyVal* CallJavaAndStoreResult(const  ColumnType* type, FunctionContext* fn_ctx,
       JniContext* jni_ctx);
+
+  /// Codegen code cannot directly call JniUtil::GetJNIEnv() because LLVM JIT cannot
+  /// handle thread-local variables (see https://bugs.llvm.org/show_bug.cgi?id=21431).
+  /// The problem is that a call to JniUtil::GetJNIEnv can be inlined in codegen code,
+  /// leading to a crash. This wrapper function calls JniUtil::GetJNIEnv but cannot be
+  /// inlined, ensuring that the contents of JniUtil::GetJNIEnv, which deal with the
+  /// thread-local variable, are compiled by GCC.
+  /// Codegen code can then use this wrapper function so the resulting generated code
+  /// will contain an actual function call instruction to the pre-compiled
+  /// JniUtil::GetJNIEnv.
+  static __attribute__((noinline)) JNIEnv* GetJniEnvNotInlined();
 };
 
 }


[impala] 05/05: IMPALA-10526: Fix BufferPoolTest.Multi8RandomSpillToRemoteMix failed in sanitizer builds

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 285cea92910ee59a96a8c9c78c74272dedcbf1f9
Author: Yida Wu <wy...@gmail.com>
AuthorDate: Sat Feb 20 13:09:57 2021 -0800

    IMPALA-10526: Fix BufferPoolTest.Multi8RandomSpillToRemoteMix failed in sanitizer builds
    
    Fixed a data race issue when running testcase
    BufferPoolTest.Multi8RandomSpillToRemoteMix in the tsan build. The
    solution is to access ScanRange::use_local_buffer_ under
    ScanRange::lock_.
    
    Tests:
    Rerun testcase BufferPoolTest.Multi8RandomSpillToRemoteMix.
    
    Change-Id: Ic77dd85d20efc7066758b2ba61e2138745cbd90b
    Reviewed-on: http://gerrit.cloudera.org:8080/17096
    Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/io/request-ranges.h |  6 ++++--
 be/src/runtime/io/scan-range.cc    | 31 ++++++++++++++++++++-----------
 2 files changed, 24 insertions(+), 13 deletions(-)

diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index dca4398..d70304e 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -375,7 +375,9 @@ class ScanRange : public RequestRange {
   ReadOutcome DoRead(DiskQueue* queue, int disk_id);
 
   /// The function runs the actual read logic to read content with the specific reader.
-  ReadOutcome DoReadInternal(DiskQueue* queue, int disk_id, FileReader* file_reader);
+  /// If use_local_buffer is true, it will read from the local buffer with the local
+  /// buffer reader.
+  ReadOutcome DoReadInternal(DiskQueue* queue, int disk_id, bool use_local_buffer);
 
   /// Cleans up a buffer that was not returned to the client.
   /// Either ReturnBuffer() or CleanUpBuffer() is called for every BufferDescriptor.
@@ -585,7 +587,7 @@ class ScanRange : public RequestRange {
   /// If set to true, the scan range is using local_buffer_reader_ to do scan operations.
   /// The flag is set during DoRead(). If the path is a remote path and the file has
   /// a local buffer, the flag is set to true, otherwise the flag is false.
-  bool use_local_buffer_{false};
+  bool use_local_buffer_ = false;
 
   /// If not empty, the ScanRange will only read these parts from the file.
   std::vector<SubRange> sub_ranges_;
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 569ef57..d1d75c9 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -164,11 +164,12 @@ unique_ptr<BufferDescriptor> ScanRange::GetUnusedBuffer(
 }
 
 ReadOutcome ScanRange::DoReadInternal(
-    DiskQueue* queue, int disk_id, FileReader* file_reader) {
+    DiskQueue* queue, int disk_id, bool use_local_buff) {
   int64_t bytes_remaining = bytes_to_read_ - bytes_read_;
   DCHECK_GT(bytes_remaining, 0);
 
   unique_ptr<BufferDescriptor> buffer_desc;
+  FileReader* file_reader = nullptr;
   {
     unique_lock<mutex> lock(lock_);
     DCHECK(!read_in_flight_);
@@ -190,7 +191,15 @@ ReadOutcome ScanRange::DoReadInternal(
       iomgr_buffer_cumulative_bytes_used_ += buffer_desc->buffer_len();
     }
     read_in_flight_ = true;
+    if (use_local_buff) {
+      file_reader = local_buffer_reader_.get();
+      file_ = disk_buffer_file_->path();
+    } else {
+      file_reader = file_reader_.get();
+    }
+    use_local_buffer_ = use_local_buff;
   }
+  DCHECK(file_reader != nullptr);
 
   // No locks in this section.  Only working on local vars.  We don't want to hold a
   // lock across the read call.
@@ -265,7 +274,7 @@ ReadOutcome ScanRange::DoReadInternal(
 }
 
 ReadOutcome ScanRange::DoRead(DiskQueue* queue, int disk_id) {
-  FileReader* file_reader = file_reader_.get();
+  bool use_local_buffer = false;
   if (disk_file_ != nullptr && disk_file_->disk_type() != DiskFileType::LOCAL) {
     // The sequence for acquiring the locks should always be from the local to
     // the remote to avoid deadlocks.
@@ -283,18 +292,15 @@ ReadOutcome ScanRange::DoRead(DiskQueue* queue, int disk_id) {
       // If the local buffer exists, we can read from the local buffer, otherwise,
       // we will read from the remote file system.
       if (!disk_buffer_file_->is_deleted(buffer_file_lock)) {
-        file_reader = local_buffer_reader_.get();
-        file_ = disk_buffer_file_->path();
-        use_local_buffer_ = true;
+        use_local_buffer = true;
       } else {
         // Read from the remote file. The remote file must be in persisted status.
         DCHECK(disk_file_->is_persisted(file_lock));
-        use_local_buffer_ = false;
       }
     }
-    return DoReadInternal(queue, disk_id, file_reader);
+    return DoReadInternal(queue, disk_id, use_local_buffer);
   }
-  return DoReadInternal(queue, disk_id, file_reader);
+  return DoReadInternal(queue, disk_id, use_local_buffer);
 }
 
 Status ScanRange::ReadSubRanges(
@@ -382,13 +388,13 @@ void ScanRange::Cancel(const Status& status) {
 void ScanRange::CancelInternal(const Status& status, bool read_error) {
   DCHECK(io_mgr_ != nullptr);
   DCHECK(!status.ok());
-  FileReader* file_reader =
-      use_local_buffer_ ? local_buffer_reader_.get() : file_reader_.get();
+  FileReader* file_reader = nullptr;
   {
     // Grab both locks to make sure that we don't change 'cancel_status_' while other
     // threads are in critical sections.
     unique_lock<mutex> scan_range_lock(lock_);
     {
+      file_reader = use_local_buffer_ ? local_buffer_reader_.get() : file_reader_.get();
       unique_lock<SpinLock> fs_lock(file_reader->lock());
       DCHECK(Validate()) << DebugString();
       // If already cancelled, preserve the original reason for cancellation. Most of the
@@ -418,7 +424,10 @@ void ScanRange::CancelInternal(const Status& status, bool read_error) {
   // TODO: IMPALA-4249 - this Close() call makes it unsafe to reuse a cancelled scan
   // range, because there is no synchronisation between this Close() call and the
   // client adding the ScanRange back into the IoMgr.
-  if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) file_reader->Close();
+  if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) {
+    DCHECK(file_reader != nullptr);
+    file_reader->Close();
+  }
 }
 
 void ScanRange::WaitForInFlightRead() {


[impala] 02/05: IMPALA-10504: Add tracing for remote block reads

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c8d94e6e365ca21d731617767bd7793f67f32222
Author: Kurt Deschler <kd...@cloudera.com>
AuthorDate: Sun Sep 27 16:37:32 2020 -0500

    IMPALA-10504: Add tracing for remote block reads
    
    This patch logs metadata for the first unexpected remote read of each
    scanrange when the flag fs_trace_remote_reads is set to true. This
    logging is intended to help diagnose the root cause of remote reads.
    
    Since a message may be logged for each scan range, there could be
    several hundred lines of output in a degenerate case. However, the
    remote read condition is not expected and verbose output may be needed
    to diagnose the root cause.
    
    Reviewed-by: Aman Sinha <am...@cloudera.com>
    Change-Id: I8c6a3e92f44813048022edf2b91299b3b0a20257
    Reviewed-on: http://gerrit.cloudera.org:8080/17062
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/io/hdfs-file-reader.cc | 39 +++++++++++++++++++++++++++++++++++
 be/src/runtime/io/hdfs-file-reader.h  |  5 +++++
 2 files changed, 44 insertions(+)

diff --git a/be/src/runtime/io/hdfs-file-reader.cc b/be/src/runtime/io/hdfs-file-reader.cc
index 1441a2d..5683c5d 100644
--- a/be/src/runtime/io/hdfs-file-reader.cc
+++ b/be/src/runtime/io/hdfs-file-reader.cc
@@ -42,6 +42,9 @@ DEFINE_int64(fs_slow_read_log_threshold_ms, 10L * 1000L,
     "Log diagnostics about I/Os issued via the HDFS client that take longer than this "
     "threshold.");
 
+DEFINE_bool(fs_trace_remote_reads, false,
+    "(Advanced) Log block locations for remote reads.");
+
 #ifndef NDEBUG
 DECLARE_int32(stress_disk_read_delay_ms);
 #endif
@@ -78,6 +81,30 @@ Status HdfsFileReader::Open(bool use_file_handle_cache) {
   return Status::OK();
 }
 
+std::string HdfsFileReader::GetHostList(int64_t file_offset,
+    int64_t bytes_to_read) const {
+  char*** hosts = hdfsGetHosts(hdfs_fs_, scan_range_->file_string()->c_str(),
+      file_offset, bytes_to_read);
+  if (hosts) {
+    std::ostringstream ostr;
+    int blockIndex = 0;
+    while (hosts[blockIndex]) {
+      ostr << " [" << blockIndex << "] { ";
+      int hostIndex = 0;
+      while (hosts[blockIndex][hostIndex]) {
+        if(hostIndex > 0) ostr << ", ";
+        ostr << hosts[blockIndex][hostIndex];
+        hostIndex++;
+      }
+      ostr << " }";
+      blockIndex++;
+    }
+    hdfsFreeHosts(hosts);
+    return ostr.str();
+  }
+  return "";
+}
+
 Status HdfsFileReader::ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_t* buffer,
     int64_t bytes_to_read, int64_t* bytes_read, bool* eof) {
   DCHECK(scan_range_->read_in_flight());
@@ -209,8 +236,20 @@ Status HdfsFileReader::ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_
       }
       *bytes_read += current_bytes_read;
 
+      bool is_first_read = (num_remote_bytes_ == 0);
       // Collect and accumulate statistics
       GetHdfsStatistics(hdfs_file, log_slow_read);
+      if (FLAGS_fs_trace_remote_reads && expected_local_ &&
+          num_remote_bytes_ > 0 && is_first_read) {
+        // Only log the first unexpected remote read for scan range
+        LOG(INFO)
+            << "First remote read of scan range on file "
+            << *scan_range_->file_string()
+            << " " << num_remote_bytes_
+            << " bytes. offsets " << file_offset
+            << "-" << file_offset+bytes_to_read-1
+            << GetHostList(file_offset, bytes_to_read);
+      }
     }
 
     int64_t cached_bytes_missed = *bytes_read - cached_read;
diff --git a/be/src/runtime/io/hdfs-file-reader.h b/be/src/runtime/io/hdfs-file-reader.h
index 29f0e2c..ed8acc7 100644
--- a/be/src/runtime/io/hdfs-file-reader.h
+++ b/be/src/runtime/io/hdfs-file-reader.h
@@ -78,6 +78,11 @@ private:
   /// true, the statistics are logged.
   void GetHdfsStatistics(hdfsFile hdfs_file, bool log_stats);
 
+  /// Return a string that contains the block indexes and list of hosts where
+  /// each block resides. i.e. [0] { hdfshost1, hdfshost2, hdfshost3 }
+
+  std::string GetHostList(int64_t file_offset, int64_t bytes_to_read) const;
+
   /// Hadoop filesystem that contains the file being read.
   hdfsFS const hdfs_fs_;
 


[impala] 04/05: IMPALA-10527: Fix DiskIoMgrTest.WriteToRemotePartialFileSuccess failed in tsan build

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f13170ef655e92e624ac65a381dd42bba6c084e5
Author: Yida Wu <wy...@gmail.com>
AuthorDate: Sun Feb 21 06:59:46 2021 -0800

    IMPALA-10527: Fix DiskIoMgrTest.WriteToRemotePartialFileSuccess failed in tsan build
    
    Fixed a data race issue when running testcase
    DiskIoMgrTest.WriteToRemotePartialFileSuccess in the tsan build. The
    cause is the TmpFileBufferPool is not released gracefully while
    destruction.
    
    Tests:
    Reran all testcases of DiskIoMgrTest with tsan and asan build.
    
    Change-Id: I6b0435bf0ee580acb5553527c0ed4f3aa806707f
    Reviewed-on: http://gerrit.cloudera.org:8080/17100
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/tmp-file-mgr-internal.h |  3 +++
 be/src/runtime/tmp-file-mgr.cc         | 23 ++++++++++++++++-------
 2 files changed, 19 insertions(+), 7 deletions(-)

diff --git a/be/src/runtime/tmp-file-mgr-internal.h b/be/src/runtime/tmp-file-mgr-internal.h
index bd75239..1df4fe4 100644
--- a/be/src/runtime/tmp-file-mgr-internal.h
+++ b/be/src/runtime/tmp-file-mgr-internal.h
@@ -340,6 +340,9 @@ class TmpFileBufferPool {
   // from the available pool and make room for other files' buffer.
   Status DequeueTmpFilesPool(std::shared_ptr<TmpFile>* tmp_file, bool quick_return);
 
+  // Shut down the pool before destruction.
+  void ShutDown();
+
  private:
   friend class TmpFileMgr;
   friend class TmpFileMgrTest;
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 676f646..cd00f42 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -168,7 +168,12 @@ using WriteDoneCallback = TmpFileMgr::WriteDoneCallback;
 
 TmpFileMgr::TmpFileMgr() {}
 
-TmpFileMgr::~TmpFileMgr() {}
+TmpFileMgr::~TmpFileMgr() {
+  if(tmp_dirs_remote_ctrl_.tmp_file_pool_ != nullptr) {
+    tmp_dirs_remote_ctrl_.tmp_file_pool_->ShutDown();
+    tmp_dirs_remote_ctrl_.tmp_file_mgr_thread_group_.JoinAll();
+  }
+}
 
 Status TmpFileMgr::Init(MetricGroup* metrics) {
   return InitCustom(FLAGS_scratch_dirs, !FLAGS_allow_multiple_scratch_dirs_per_device,
@@ -321,7 +326,7 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers,
     bool is_s3a_path = IsS3APath(tmp_path.c_str(), false);
     if (is_hdfs_path || is_s3a_path) {
       // Only support one remote dir.
-      if (tmp_dirs_remote_ != nullptr) {
+      if (HasRemoteDir()) {
         LOG(WARNING) << "Only one remote directory is supported. Directory "
                      << tmp_path.c_str() << " is abandoned.";
         continue;
@@ -379,7 +384,7 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers,
   std::sort(tmp_dirs_.begin(), tmp_dirs_.end(),
       [](const TmpDir& a, const TmpDir& b) { return a.priority < b.priority; });
 
-  if (tmp_dirs_remote_ != nullptr) {
+  if (HasRemoteDir()) {
     if (local_buff_dir_ == nullptr) {
       // Should at least have one local dir for the buffer. Later we might allow to use
       // s3 fast upload directly without a buffer.
@@ -400,15 +405,15 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers,
       metrics->AddGauge(TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS, 0);
   active_scratch_dirs_metric_ = SetMetric<string>::CreateAndRegister(
       metrics, TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST, set<string>());
-  if (tmp_dirs_remote_ == nullptr) {
-    num_active_scratch_dirs_metric_->SetValue(tmp_dirs_.size());
-  } else {
+  if (HasRemoteDir()) {
     num_active_scratch_dirs_metric_->SetValue(tmp_dirs_.size() + 1);
+  } else {
+    num_active_scratch_dirs_metric_->SetValue(tmp_dirs_.size());
   }
   for (int i = 0; i < tmp_dirs_.size(); ++i) {
     active_scratch_dirs_metric_->Add(tmp_dirs_[i].path);
   }
-  if (tmp_dirs_remote_ != nullptr) {
+  if (HasRemoteDir()) {
     active_scratch_dirs_metric_->Add(tmp_dirs_remote_->path);
     RETURN_IF_ERROR(CreateTmpFileBufferPoolThread(metrics));
   }
@@ -1740,6 +1745,10 @@ TmpFileBufferPool::TmpFileBufferPool(TmpFileMgr* tmp_file_mgr)
 }
 
 TmpFileBufferPool::~TmpFileBufferPool() {
+  DCHECK(shut_down_);
+}
+
+void TmpFileBufferPool::ShutDown() {
   {
     unique_lock<mutex> l(lock_);
     shut_down_ = true;