You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2018/10/18 21:32:40 UTC

[1/4] impala git commit: IMPALA-7689: Reduce per column per partition stats estimate size

Repository: impala
Updated Branches:
  refs/heads/master 6399a65a0 -> 9f5c5e6df


IMPALA-7689: Reduce per column per partition stats estimate size

With the improvements in the incremental stats memory representation
(IMPALA-7424), the per column per partition stats estimate should be
reduced to account for the compressed memory footprint. Doing some
experiments on various test tables, I see the size is down by 50-70%.

This patch reduces the size estimate by 50% (conservative). Ideally we
don't need to estimate on the Catalog server during serialization since
we can compute the byte sizes by looping through all the partitions.
However this patch retains the current logic to keep it consistent with
"compute incremental stats" analysis.

Change-Id: I347b41d9b298d7cd73ec812692172e0511415eee
Reviewed-on: http://gerrit.cloudera.org:8080/11706
Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/5af5456a
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/5af5456a
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/5af5456a

Branch: refs/heads/master
Commit: 5af5456a2d95a43ce63f4e364ff0b9631729bb1a
Parents: 6399a65
Author: Bharath Vissapragada <bh...@cloudera.com>
Authored: Tue Oct 16 18:26:13 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Oct 18 00:57:47 2018 +0000

----------------------------------------------------------------------
 fe/src/main/java/org/apache/impala/catalog/HdfsTable.java | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/5af5456a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 88c1fd5..1bca8ec 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -195,8 +195,9 @@ public class HdfsTable extends Table implements FeFsTable {
   @VisibleForTesting
   HdfsPartition prototypePartition_;
 
-  // Estimate (in bytes) of the incremental stats size per column per partition
-  public static final long STATS_SIZE_PER_COLUMN_BYTES = 400;
+  // Empirical estimate (in bytes) of the incremental stats size per column per
+  // partition.
+  public static final long STATS_SIZE_PER_COLUMN_BYTES = 200;
 
   // Bi-directional map between an integer index and a unique datanode
   // TNetworkAddresses, each of which contains blocks of 1 or more
@@ -1753,8 +1754,6 @@ public class HdfsTable extends Table implements FeFsTable {
    * fetch from catalogd).
    */
   private boolean shouldSendIncrementalStats(int numPartitions) {
-    // TODO(bharath): Revisit the constant STATS_SIZE_PER_COLUMN_BYTES after the
-    // new incremental stats in-memory representation changes.
     long statsSizeEstimate =
         numPartitions * getColumns().size() * STATS_SIZE_PER_COLUMN_BYTES;
     return statsSizeEstimate < BackendConfig.INSTANCE.getIncStatsMaxSize()


[3/4] impala git commit: IMPALA-7543: Enhance scan ranges to support sub-ranges

Posted by mi...@apache.org.
IMPALA-7543: Enhance scan ranges to support sub-ranges

This commit enhances the ScanRange class to make it possible to only
read some smaller parts of the whole ScanRange. This functionality
is needed by IMPALA-5843.

A sub-range is an offset and length which is located within the scan
range. Sub-ranges can be added to a scan range when calling
ScanRange::Reset(). If done so, the ScanRange class will only read the
parts defined by the sub-ranges.

If we have sub-ranges for a cache read then the ScanRange won't
enqueue the whole cache buffer (which contains the whole ScanRange),
but memcpy() the sub-ranges to IO/client buffers.

Smaller refactorings needed to do:
 * remove scan_range_offset_ from BufferDescriptor
 * number of bytes read are bookkeeped by ScanRange again

Testing:
 * introduced CacheReaderTestStub to fake cache reads during testing
 * extended disk-io-mgr-test.cc with sub-ranges

Change-Id: Iea26ba386713990f7671aab5a372cf449b8d51e4
Reviewed-on: http://gerrit.cloudera.org:8080/11520
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/48fb4902
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/48fb4902
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/48fb4902

Branch: refs/heads/master
Commit: 48fb4902d4f28c9e4d327b80b00b33962c118c22
Parents: 2fb8eba
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
Authored: Wed Sep 26 13:58:36 2018 +0200
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Oct 18 20:33:36 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/io/CMakeLists.txt           |   1 -
 be/src/runtime/io/cache-reader-test-stub.h |  62 +++++++++
 be/src/runtime/io/disk-io-mgr-stress.cc    |   3 +-
 be/src/runtime/io/disk-io-mgr-test.cc      | 177 +++++++++++++++++++----
 be/src/runtime/io/disk-io-mgr.cc           |   2 +-
 be/src/runtime/io/file-reader.cc           |  31 -----
 be/src/runtime/io/file-reader.h            |  23 ++-
 be/src/runtime/io/hdfs-file-reader.cc      |  32 ++---
 be/src/runtime/io/hdfs-file-reader.h       |   4 +-
 be/src/runtime/io/local-file-reader.cc     |  15 +-
 be/src/runtime/io/local-file-reader.h      |   5 +-
 be/src/runtime/io/request-context.cc       |  41 +++++-
 be/src/runtime/io/request-context.h        |   8 ++
 be/src/runtime/io/request-ranges.h         |  87 ++++++++++--
 be/src/runtime/io/scan-range.cc            | 178 +++++++++++++++++++-----
 15 files changed, 511 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/CMakeLists.txt b/be/src/runtime/io/CMakeLists.txt
index bb26ef0..675bd6e 100644
--- a/be/src/runtime/io/CMakeLists.txt
+++ b/be/src/runtime/io/CMakeLists.txt
@@ -29,7 +29,6 @@ add_library(Io
   error-converter.cc
   request-context.cc
   scan-range.cc
-  file-reader.cc
   hdfs-file-reader.cc
   local-file-reader.cc
 )

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/cache-reader-test-stub.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/cache-reader-test-stub.h b/be/src/runtime/io/cache-reader-test-stub.h
new file mode 100644
index 0000000..b53b30b
--- /dev/null
+++ b/be/src/runtime/io/cache-reader-test-stub.h
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime/io/file-reader.h"
+#include "runtime/io/request-ranges.h"
+
+namespace impala {
+namespace io {
+
+/// Only for testing the code path when reading from the cache is successful.
+/// Takes a pointer to a buffer in its constructor, also the length of this buffer.
+/// CachedFile() simply returns the pointer and length.
+/// Invoking ReadFromPos() on it results in an error.
+class CacheReaderTestStub : public FileReader {
+public:
+  CacheReaderTestStub(ScanRange* scan_range, uint8_t* cache, int64_t length) :
+    FileReader(scan_range),
+    cache_(cache),
+    length_(length) {
+  }
+
+  ~CacheReaderTestStub() {}
+
+  virtual Status Open(bool use_file_handle_cache) override {
+    return Status::OK();
+  }
+
+  virtual Status ReadFromPos(int64_t file_offset, uint8_t* buffer,
+      int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override {
+    DCHECK(false);
+    return Status("Not implemented");
+  }
+
+  virtual void CachedFile(uint8_t** data, int64_t* length) override {
+    *length = length_;
+    *data = cache_;
+  }
+
+  virtual void Close() override {}
+private:
+  uint8_t* cache_ = nullptr;
+  int64_t length_ = 0;
+};
+
+}
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc
index 16cfbf2..4335aa4 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -119,6 +119,7 @@ void DiskIoMgrStress::ClientThread(int client_id) {
 
     while (!eos) {
       ScanRange* range;
+      int64_t scan_range_offset = 0;
       bool needs_buffers;
       Status status = client->reader->GetNextUnstartedRange(&range, &needs_buffers);
       CHECK(status.ok() || status.IsCancelled());
@@ -135,7 +136,6 @@ void DiskIoMgrStress::ClientThread(int client_id) {
         CHECK(status.ok() || status.IsCancelled());
         if (buffer == NULL) break;
 
-        int64_t scan_range_offset = buffer->scan_range_offset();
         int len = buffer->len();
         CHECK_GE(scan_range_offset, 0);
         CHECK_LT(scan_range_offset, expected.size());
@@ -154,6 +154,7 @@ void DiskIoMgrStress::ClientThread(int client_id) {
         memcpy(read_buffer + file_offset, buffer->buffer(), buffer->len());
         range->ReturnBuffer(move(buffer));
         bytes_read += len;
+        scan_range_offset += len;
 
         CHECK_GE(bytes_read, 0);
         CHECK_LE(bytes_read, expected.size());

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index a6689a8..7a9bc23 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -24,6 +24,7 @@
 #include "common/init.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/io/cache-reader-test-stub.h"
 #include "runtime/io/local-file-system-with-fault-injection.h"
 #include "runtime/io/disk-io-mgr-stress.h"
 #include "runtime/io/disk-io-mgr.h"
@@ -179,7 +180,7 @@ class DiskIoMgrTest : public testing::Test {
     }
     ASSERT_OK(range->GetNext(&buffer));
     ASSERT_TRUE(buffer != nullptr);
-    EXPECT_EQ(buffer->len(), range->len());
+    EXPECT_EQ(buffer->len(), range->bytes_to_read());
     if (expected_len < 0) expected_len = strlen(expected);
     int cmp = memcmp(buffer->buffer(), expected, expected_len);
     EXPECT_TRUE(cmp == 0);
@@ -190,6 +191,7 @@ class DiskIoMgrTest : public testing::Test {
       const char* expected, int expected_len, const Status& expected_status) {
     char result[expected_len + 1];
     memset(result, 0, expected_len + 1);
+    int64_t scan_range_offset = 0;
 
     while (true) {
       unique_ptr<BufferDescriptor> buffer;
@@ -200,8 +202,9 @@ class DiskIoMgrTest : public testing::Test {
         break;
       }
       ASSERT_LE(buffer->len(), expected_len);
-      memcpy(result + range->offset() + buffer->scan_range_offset(),
+      memcpy(result + range->offset() + scan_range_offset,
           buffer->buffer(), buffer->len());
+      scan_range_offset += buffer->len();
       range->ReturnBuffer(move(buffer));
     }
     ValidateEmptyOrCorrect(expected, result, expected_len);
@@ -229,11 +232,16 @@ class DiskIoMgrTest : public testing::Test {
     }
   }
 
+  static void SetReaderStub(ScanRange* scan_range, unique_ptr<FileReader> reader_stub) {
+    scan_range->SetFileReader(move(reader_stub));
+  }
+
   ScanRange* InitRange(ObjectPool* pool, const char* file_path, int offset, int len,
-      int disk_id, int64_t mtime, void* meta_data = nullptr, bool is_cached = false) {
+      int disk_id, int64_t mtime, void* meta_data = nullptr, bool is_cached = false,
+      std::vector<ScanRange::SubRange> sub_ranges = {}) {
     ScanRange* range = pool->Add(new ScanRange);
     range->Reset(nullptr, file_path, len, offset, disk_id, true,
-        BufferOpts(is_cached, mtime), meta_data);
+        BufferOpts(is_cached, mtime), move(sub_ranges), meta_data);
     EXPECT_EQ(mtime, range->mtime());
     return range;
   }
@@ -252,6 +260,12 @@ class DiskIoMgrTest : public testing::Test {
       const string& tmp_file, int offset, RequestContext* writer,
       const string& expected_output);
 
+  void SingleReaderTestBody(const char* data, const char* expected_result,
+      vector<ScanRange::SubRange> sub_ranges = {});
+
+  void CachedReadsTestBody(const char* data, const char* expected,
+      bool fake_cache, vector<ScanRange::SubRange> sub_ranges = {});
+
   /// Convenience function to get a reference to the buffer pool.
   BufferPool* buffer_pool() const { return ExecEnv::GetInstance()->buffer_pool(); }
 
@@ -522,13 +536,11 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
   buffer_pool()->DeregisterClient(&read_client);
 }
 
-// Basic test with a single reader, testing multiple threads, disks and a different
-// number of buffers.
-TEST_F(DiskIoMgrTest, SingleReader) {
-  InitRootReservation(LARGE_RESERVATION_LIMIT);
+void DiskIoMgrTest::SingleReaderTestBody(const char* data, const char* expected_result,
+    vector<ScanRange::SubRange> sub_ranges) {
   const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
-  const char* data = "abcdefghijklm";
-  int len = strlen(data);
+  int data_len = strlen(data);
+  int expected_result_len = strlen(expected_result);
   CreateTempFile(tmp_file, data);
 
   // Get mtime for file
@@ -554,9 +566,10 @@ TEST_F(DiskIoMgrTest, SingleReader) {
         unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
 
         vector<ScanRange*> ranges;
-        for (int i = 0; i < len; ++i) {
+        for (int i = 0; i < data_len; ++i) {
           int disk_id = i % num_disks;
-          ranges.push_back(InitRange(&tmp_pool, tmp_file, 0, len, disk_id, stat_val.st_mtime));
+          ranges.push_back(InitRange(&tmp_pool, tmp_file, 0, data_len, disk_id,
+              stat_val.st_mtime, nullptr, false, sub_ranges));
         }
         ASSERT_OK(reader->AddScanRanges(ranges));
 
@@ -564,7 +577,8 @@ TEST_F(DiskIoMgrTest, SingleReader) {
         thread_group threads;
         for (int i = 0; i < num_read_threads; ++i) {
           threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(),
-              &read_client, data, len, Status::OK(), 0, &num_ranges_processed));
+              &read_client, expected_result, expected_result_len, Status::OK(), 0,
+              &num_ranges_processed));
         }
         threads.join_all();
 
@@ -578,6 +592,23 @@ TEST_F(DiskIoMgrTest, SingleReader) {
   EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
 }
 
+// Basic test with a single reader, testing multiple threads, disks and a different
+// number of buffers.
+TEST_F(DiskIoMgrTest, SingleReader) {
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
+  const char* data = "abcdefghijklm";
+  SingleReaderTestBody(data, data);
+}
+
+TEST_F(DiskIoMgrTest, SingleReaderSubRanges) {
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
+  const char* data = "abcdefghijklm";
+  int64_t data_len = strlen(data);
+  SingleReaderTestBody(data, data, {{0, data_len}});
+  SingleReaderTestBody(data, "abdef", {{0, 2}, {3, 3}});
+  SingleReaderTestBody(data, "bceflm", {{1, 2}, {4, 2}, {11, 2}});
+}
+
 // This test issues adding additional scan ranges while there are some still in flight.
 TEST_F(DiskIoMgrTest, AddScanRangeTest) {
   InitRootReservation(LARGE_RESERVATION_LIMIT);
@@ -872,14 +903,10 @@ TEST_F(DiskIoMgrTest, MemScarcity) {
   }
 }
 
-// Test when some scan ranges are marked as being cached.
-// Since these files are not in HDFS, the cached path always fails so this
-// only tests the fallback mechanism.
-// TODO: we can fake the cached read path without HDFS
-TEST_F(DiskIoMgrTest, CachedReads) {
-  InitRootReservation(LARGE_RESERVATION_LIMIT);
+void DiskIoMgrTest::CachedReadsTestBody(const char* data, const char* expected,
+    bool fake_cache, vector<ScanRange::SubRange> sub_ranges) {
   const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
-  const char* data = "abcdefghijklm";
+  uint8_t* cached_data = reinterpret_cast<uint8_t*>(const_cast<char*>(data));
   int len = strlen(data);
   CreateTempFile(tmp_file, data);
 
@@ -898,17 +925,26 @@ TEST_F(DiskIoMgrTest, CachedReads) {
     unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
 
     ScanRange* complete_range =
-        InitRange(&pool_, tmp_file, 0, strlen(data), 0, stat_val.st_mtime, nullptr, true);
+        InitRange(&pool_, tmp_file, 0, strlen(data), 0, stat_val.st_mtime, nullptr, true,
+            sub_ranges);
+    if (fake_cache) {
+      SetReaderStub(complete_range, make_unique<CacheReaderTestStub>(
+          complete_range, cached_data, len));
+    }
 
     // Issue some reads before the async ones are issued
-    ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
-    ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
+    ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, expected);
+    ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, expected);
 
     vector<ScanRange*> ranges;
     for (int i = 0; i < len; ++i) {
       int disk_id = i % num_disks;
-      ranges.push_back(
-          InitRange(&pool_, tmp_file, 0, len, disk_id, stat_val.st_mtime, nullptr, true));
+      ScanRange* range = InitRange(&pool_, tmp_file, 0, len, disk_id, stat_val.st_mtime,
+          nullptr, true, sub_ranges);
+      ranges.push_back(range);
+      if (fake_cache) {
+        SetReaderStub(range, make_unique<CacheReaderTestStub>(range, cached_data, len));
+      }
     }
     ASSERT_OK(reader->AddScanRanges(ranges));
 
@@ -916,19 +952,19 @@ TEST_F(DiskIoMgrTest, CachedReads) {
     thread_group threads;
     for (int i = 0; i < 5; ++i) {
       threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), &read_client,
-          data, strlen(data), Status::OK(), 0, &num_ranges_processed));
+          expected, strlen(expected), Status::OK(), 0, &num_ranges_processed));
     }
 
     // Issue some more sync ranges
     for (int i = 0; i < 5; ++i) {
       sched_yield();
-      ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
+      ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, expected);
     }
 
     threads.join_all();
 
-    ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
-    ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
+    ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, expected);
+    ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, expected);
 
     EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
     io_mgr.UnregisterContext(reader.get());
@@ -938,6 +974,32 @@ TEST_F(DiskIoMgrTest, CachedReads) {
   EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
 }
 
+// Test when some scan ranges are marked as being cached.
+TEST_F(DiskIoMgrTest, CachedReads) {
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
+  const char* data = "abcdefghijklm";
+  // Don't fake the cache, i.e. test the fallback mechanism
+  CachedReadsTestBody(data, data, false);
+  // Fake the test with a file reader stub.
+  CachedReadsTestBody(data, data, true);
+}
+
+// Test when some scan ranges are marked as being cached and there
+// are sub-ranges as well.
+TEST_F(DiskIoMgrTest, CachedReadsSubRanges) {
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
+  const char* data = "abcdefghijklm";
+  int64_t data_len = strlen(data);
+
+  // first iteration tests the fallback mechanism with sub-ranges
+  // second iteration fakes a cache
+  for (bool fake_cache : {false, true}) {
+    CachedReadsTestBody(data, data, fake_cache, {{0, data_len}});
+    CachedReadsTestBody(data, "bc", fake_cache, {{1, 2}});
+    CachedReadsTestBody(data, "abchilm", fake_cache, {{0, 3}, {7, 2}, {11, 2}});
+  }
+}
+
 TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
   InitRootReservation(LARGE_RESERVATION_LIMIT);
   const int ITERATIONS = 1;
@@ -1401,6 +1463,63 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
   EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
 }
 
+// Test reading into a client-allocated buffer using sub-ranges.
+TEST_F(DiskIoMgrTest, ReadIntoClientBufferSubRanges) {
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
+  const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
+  const char* data = "the quick brown fox jumped over the lazy dog";
+  uint8_t* cache = reinterpret_cast<uint8_t*>(const_cast<char*>(data));
+  int data_len = strlen(data);
+  int read_len = 4; // Make buffer size smaller than client-provided buffer.
+  CreateTempFile(tmp_file, data);
+
+  // Get mtime for file
+  struct stat stat_val;
+  stat(tmp_file, &stat_val);
+
+  scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, read_len, read_len));
+  ASSERT_OK(io_mgr->Init());
+  // Reader doesn't need to provide client if it's providing buffers.
+  unique_ptr<RequestContext> reader = io_mgr->RegisterContext();
+
+  auto test_case = [&](bool fake_cache, const char* expected_result,
+      vector<ScanRange::SubRange> sub_ranges) {
+    int result_len = strlen(expected_result);
+    vector<uint8_t> client_buffer(result_len);
+    ScanRange* range = pool_.Add(new ScanRange);
+    range->Reset(nullptr, tmp_file, data_len, 0, 0, true,
+        BufferOpts::ReadInto(fake_cache, stat_val.st_mtime, client_buffer.data(),
+            result_len), move(sub_ranges));
+    if (fake_cache) {
+      SetReaderStub(range, make_unique<CacheReaderTestStub>(range, cache, data_len));
+    }
+    bool needs_buffers;
+    ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
+    ASSERT_FALSE(needs_buffers);
+
+    unique_ptr<BufferDescriptor> io_buffer;
+    ASSERT_OK(range->GetNext(&io_buffer));
+    ASSERT_TRUE(io_buffer->eosr());
+    ASSERT_EQ(result_len, io_buffer->len());
+    ASSERT_EQ(client_buffer.data(), io_buffer->buffer());
+    ASSERT_EQ(memcmp(io_buffer->buffer(), expected_result, result_len), 0);
+
+    // DiskIoMgr should not have allocated memory.
+    EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
+    range->ReturnBuffer(move(io_buffer));
+  };
+
+  for (bool fake_cache : {false, true}) {
+    test_case(fake_cache, data, {{0, data_len}});
+    test_case(fake_cache, data, {{0, 15}, {15, data_len - 15}});
+    test_case(fake_cache, "quick fox", {{4, 5}, {15, 4}});
+    test_case(fake_cache, "the brown dog", {{0, 3}, {9, 6}, {data_len - 4, 4}});
+  }
+
+  io_mgr->UnregisterContext(reader.get());
+  EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
+}
+
 // Test reading into a client-allocated buffer where the read fails.
 TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
   InitRootReservation(LARGE_RESERVATION_LIMIT);

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 75136ec..fa871d1 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -302,7 +302,7 @@ Status DiskIoMgr::AllocateBuffersForRange(
   BufferPool* bp = ExecEnv::GetInstance()->buffer_pool();
   Status status;
   vector<unique_ptr<BufferDescriptor>> buffers;
-  for (int64_t buffer_size : ChooseBufferSizes(range->len(), max_bytes)) {
+  for (int64_t buffer_size : ChooseBufferSizes(range->bytes_to_read(), max_bytes)) {
     BufferPool::BufferHandle handle;
     status = bp->AllocateBuffer(bp_client, buffer_size, &handle);
     if (!status.ok()) goto error;

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/file-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/file-reader.cc b/be/src/runtime/io/file-reader.cc
deleted file mode 100644
index 81ef090..0000000
--- a/be/src/runtime/io/file-reader.cc
+++ /dev/null
@@ -1,31 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "gutil/strings/substitute.h"
-#include "runtime/io/file-reader.h"
-
-#include "common/names.h"
-
-namespace impala {
-namespace io {
-
-string FileReader::DebugString() const {
-  return Substitute("bytes_read=$0", bytes_read_);
-}
-
-}
-}

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/file-reader.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/file-reader.h b/be/src/runtime/io/file-reader.h
index 40f2a5c..9dbcc31 100644
--- a/be/src/runtime/io/file-reader.h
+++ b/be/src/runtime/io/file-reader.h
@@ -40,33 +40,31 @@ public:
   FileReader(ScanRange* scan_range) : scan_range_(scan_range) {}
   virtual ~FileReader() {}
 
-  /// Returns number of bytes read by this file reader.
-  int bytes_read() const { return bytes_read_; }
-
   /// Opens file that is associated with 'scan_range_'.
   /// 'use_file_handle_cache' currently only used by HdfsFileReader.
   virtual Status Open(bool use_file_handle_cache) = 0;
 
   /// Reads bytes from given position ('file_offset'). Tries to read
   /// 'bytes_to_read' amount of bytes. 'bytes_read' contains the number of
-  /// bytes actually read. 'eosr' is set to true when end of file has reached,
-  /// or the file reader has read all the bytes needed by 'scan_range_'.
+  /// bytes actually read. 'eof' is set to true when end of file has reached.
   virtual Status ReadFromPos(int64_t file_offset, uint8_t* buffer,
-      int64_t bytes_to_read, int64_t* bytes_read, bool* eosr) = 0;
+      int64_t bytes_to_read, int64_t* bytes_read, bool* eof) = 0;
 
   /// ***Currently only for HDFS***
-  /// Returns a pointer to a cached buffer that contains the contents of the file.
-  virtual void* CachedFile() = 0;
+  /// When successful, sets 'data' to a buffer that contains the contents of a file,
+  /// and 'length' is set to the length of the data.
+  /// When unsuccessful, 'data' is set to nullptr.
+  virtual void CachedFile(uint8_t** data, int64_t* length) = 0;
 
   /// Closes the file associated with 'scan_range_'. It doesn't have effect on other
   /// scan ranges.
   virtual void Close() = 0;
 
-  /// Reset internal bookkeeping, e.g. how many bytes have been read.
-  virtual void ResetState() { bytes_read_ = 0; }
+  /// Resets internal bookkeeping
+  virtual void ResetState() {}
 
   // Debug string of this file reader.
-  virtual std::string DebugString() const;
+  virtual std::string DebugString() const { return ""; }
 
   SpinLock& lock() { return lock_; }
 protected:
@@ -81,9 +79,6 @@ protected:
 
   /// The scan range this file reader serves.
   ScanRange* const scan_range_;
-
-  /// Number of bytes read by this reader.
-  int bytes_read_ = 0;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/hdfs-file-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/hdfs-file-reader.cc b/be/src/runtime/io/hdfs-file-reader.cc
index ea61b6a..c6a6cec 100644
--- a/be/src/runtime/io/hdfs-file-reader.cc
+++ b/be/src/runtime/io/hdfs-file-reader.cc
@@ -40,6 +40,7 @@ namespace io {
 
 HdfsFileReader::~HdfsFileReader() {
   DCHECK(exclusive_hdfs_fh_ == nullptr) << "File was not closed.";
+  DCHECK(cached_buffer_ == nullptr) << "Cached buffer was not released.";
 }
 
 Status HdfsFileReader::Open(bool use_file_handle_cache) {
@@ -73,7 +74,7 @@ Status HdfsFileReader::Open(bool use_file_handle_cache) {
 }
 
 Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
-    int64_t bytes_to_read, int64_t* bytes_read, bool* eosr) {
+    int64_t bytes_to_read, int64_t* bytes_read, bool* eof) {
   DCHECK(scan_range_->read_in_flight());
   DCHECK_GE(bytes_to_read, 0);
   // Delay before acquiring the lock, to allow triggering IMPALA-6587 race.
@@ -87,7 +88,7 @@ Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
 
   auto io_mgr = scan_range_->io_mgr_;
   auto request_context = scan_range_->reader_;
-  *eosr = false;
+  *eof = false;
   *bytes_read = 0;
 
   CachedHdfsFileHandle* borrowed_hdfs_fh = nullptr;
@@ -151,7 +152,7 @@ Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
       DCHECK_GT(current_bytes_read, -1);
       if (current_bytes_read == 0) {
         // No more bytes in the file. The scan range went past the end.
-        *eosr = true;
+        *eof = true;
         break;
       }
       *bytes_read += current_bytes_read;
@@ -164,12 +165,7 @@ Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
   if (borrowed_hdfs_fh != nullptr) {
     io_mgr->ReleaseCachedHdfsFileHandle(scan_range_->file_string(), borrowed_hdfs_fh);
   }
-  if (!status.ok())
-    return status;
-  bytes_read_ += *bytes_read;
-  DCHECK_LE(bytes_read_, scan_range_->len());
-  if (bytes_read_ == scan_range_->len()) *eosr = true;
-  return Status::OK();
+  return status;
 }
 
 Status HdfsFileReader::ReadFromPosInternal(hdfsFile hdfs_file, int64_t position_in_file,
@@ -203,16 +199,22 @@ Status HdfsFileReader::ReadFromPosInternal(hdfsFile hdfs_file, int64_t position_
   return Status::OK();
 }
 
-void* HdfsFileReader::CachedFile() {
+void HdfsFileReader::CachedFile(uint8_t** data, int64_t* length) {
   {
     unique_lock<SpinLock> hdfs_lock(lock_);
+    DCHECK(cached_buffer_ == nullptr);
     DCHECK(exclusive_hdfs_fh_ != nullptr);
     cached_buffer_ = hadoopReadZero(exclusive_hdfs_fh_->file(),
         scan_range_->io_mgr_->cached_read_options(), scan_range_->len());
   }
-  if (cached_buffer_ == nullptr) return nullptr;
-  bytes_read_ = hadoopRzBufferLength(cached_buffer_);
-  return const_cast<void*>(hadoopRzBufferGet(cached_buffer_));
+  if (cached_buffer_ == nullptr) {
+    *data = nullptr;
+    *length = 0;
+    return;
+  }
+  *data = reinterpret_cast<uint8_t*>(
+      const_cast<void*>(hadoopRzBufferGet(cached_buffer_)));
+  *length = hadoopRzBufferLength(cached_buffer_);
 }
 
 void HdfsFileReader::Close() {
@@ -220,11 +222,9 @@ void HdfsFileReader::Close() {
   if (exclusive_hdfs_fh_ != nullptr) {
     GetHdfsStatistics(exclusive_hdfs_fh_->file());
 
-    if (scan_range_->external_buffer_tag_ ==
-        ScanRange::ExternalBufferTag::CACHED_BUFFER) {
+    if (cached_buffer_ != nullptr) {
       hadoopRzBufferFree(exclusive_hdfs_fh_->file(), cached_buffer_);
       cached_buffer_ = nullptr;
-      scan_range_->external_buffer_tag_ = ScanRange::ExternalBufferTag::NO_BUFFER;
     }
 
     // Destroy the file handle.

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/hdfs-file-reader.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/hdfs-file-reader.h b/be/src/runtime/io/hdfs-file-reader.h
index 3b3642b..ba95095 100644
--- a/be/src/runtime/io/hdfs-file-reader.h
+++ b/be/src/runtime/io/hdfs-file-reader.h
@@ -34,11 +34,11 @@ public:
 
   virtual Status Open(bool use_file_handle_cache) override;
   virtual Status ReadFromPos(int64_t file_offset, uint8_t* buffer,
-      int64_t bytes_to_read, int64_t* bytes_read, bool* eosr) override;
+      int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override;
   /// Reads from the DN cache. On success, sets cached_buffer_ to the DN
   /// buffer and returns a pointer to the underlying raw buffer.
   /// Returns nullptr if the data is not cached.
-  virtual void* CachedFile() override;
+  virtual void CachedFile(uint8_t** data, int64_t* length) override;
   virtual void Close() override;
   virtual void ResetState() override;
   virtual std::string DebugString() const override;

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/local-file-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/local-file-reader.cc b/be/src/runtime/io/local-file-reader.cc
index 47c025b..3f88106 100644
--- a/be/src/runtime/io/local-file-reader.cc
+++ b/be/src/runtime/io/local-file-reader.cc
@@ -49,7 +49,7 @@ Status LocalFileReader::Open(bool use_file_handle_cache) {
 }
 
 Status LocalFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
-    int64_t bytes_to_read, int64_t* bytes_read, bool* eosr) {
+    int64_t bytes_to_read, int64_t* bytes_read, bool* eof) {
   DCHECK(scan_range_->read_in_flight());
   DCHECK_GE(bytes_to_read, 0);
   // Delay before acquiring the lock, to allow triggering IMPALA-6587 race.
@@ -61,7 +61,7 @@ Status LocalFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
   unique_lock<SpinLock> fs_lock(lock_);
   RETURN_IF_ERROR(scan_range_->cancel_status_);
 
-  *eosr = false;
+  *eof = false;
   *bytes_read = 0;
 
   DCHECK(file_ != nullptr);
@@ -85,17 +85,14 @@ Status LocalFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
     }
     // On Linux, we should only get partial reads from block devices on error or eof.
     DCHECK(feof(file_) != 0);
-    *eosr = true;
+    *eof = true;
   }
-  bytes_read_ += *bytes_read;
-  DCHECK_LE(bytes_read_, scan_range_->len());
-  if (bytes_read_ == scan_range_->len()) *eosr = true;
   return Status::OK();
 }
 
-void* LocalFileReader::CachedFile() {
-  DCHECK(false);
-  return nullptr;
+void LocalFileReader::CachedFile(uint8_t** data, int64_t* length) {
+  *data = nullptr;
+  *length = 0;
 }
 
 void LocalFileReader::Close() {

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/local-file-reader.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/local-file-reader.h b/be/src/runtime/io/local-file-reader.h
index d8eedd1..14cff3c 100644
--- a/be/src/runtime/io/local-file-reader.h
+++ b/be/src/runtime/io/local-file-reader.h
@@ -17,7 +17,6 @@
 
 #pragma once
 
-#include "common/hdfs.h"
 #include "runtime/io/file-reader.h"
 
 namespace impala {
@@ -32,9 +31,9 @@ public:
 
   virtual Status Open(bool use_file_handle_cache) override;
   virtual Status ReadFromPos(int64_t file_offset, uint8_t* buffer,
-      int64_t bytes_to_read, int64_t* bytes_read, bool* eosr) override;
+      int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override;
   /// We don't cache files of the local file system.
-  virtual void* CachedFile() override;
+  virtual void CachedFile(uint8_t** data, int64_t* length) override;
   virtual void Close() override;
 private:
   /// Points to a C FILE object between calls to Open() and Close(), otherwise nullptr.

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/request-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc
index 56f6704..24213f9 100644
--- a/be/src/runtime/io/request-context.cc
+++ b/be/src/runtime/io/request-context.cc
@@ -425,7 +425,8 @@ Status RequestContext::GetNextUnstartedRange(ScanRange** range, bool* needs_buff
       *range = cached_ranges_.Dequeue();
       DCHECK((*range)->try_cache());
       bool cached_read_succeeded;
-      RETURN_IF_ERROR((*range)->ReadFromCache(lock, &cached_read_succeeded));
+      RETURN_IF_ERROR(TryReadFromCache(lock, *range, &cached_read_succeeded,
+          needs_buffers));
       if (cached_read_succeeded) return Status::OK();
 
       // This range ended up not being cached. Loop again and pick up a new range.
@@ -472,12 +473,9 @@ Status RequestContext::StartScanRange(ScanRange* range, bool* needs_buffers) {
   DCHECK_NE(range->len(), 0);
   if (range->try_cache()) {
     bool cached_read_succeeded;
-    RETURN_IF_ERROR(range->ReadFromCache(lock, &cached_read_succeeded));
-    if (cached_read_succeeded) {
-      DCHECK(Validate()) << endl << DebugString();
-      *needs_buffers = false;
-      return Status::OK();
-    }
+    RETURN_IF_ERROR(TryReadFromCache(lock, range, &cached_read_succeeded,
+        needs_buffers));
+    if (cached_read_succeeded) return Status::OK();
     // Cached read failed, fall back to normal read path.
   }
   // If we don't have a buffer yet, the caller must allocate buffers for the range.
@@ -491,6 +489,35 @@ Status RequestContext::StartScanRange(ScanRange* range, bool* needs_buffers) {
   return Status::OK();
 }
 
+Status RequestContext::TryReadFromCache(const unique_lock<mutex>& lock,
+    ScanRange* range, bool* read_succeeded, bool* needs_buffers) {
+  DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+  RETURN_IF_ERROR(range->ReadFromCache(lock, read_succeeded));
+  if (!*read_succeeded) return Status::OK();
+
+  DCHECK(Validate()) << endl << DebugString();
+  ScanRange::ExternalBufferTag buffer_tag = range->external_buffer_tag();
+  // The following cases are possible at this point:
+  // * The scan range doesn't have sub-ranges:
+  // ** buffer_tag is CACHED_BUFFER and the buffer is already available to the reader.
+  //    (there is nothing to do)
+  //
+  // * The scan range has sub-ranges, and buffer_tag is:
+  // ** NO_BUFFER: the client needs to add buffers to the scan range
+  // ** CLIENT_BUFFER: the client already provided a buffer to copy data into it
+  *needs_buffers = buffer_tag == ScanRange::ExternalBufferTag::NO_BUFFER;
+  if (*needs_buffers) {
+    DCHECK(range->HasSubRanges());
+    range->SetBlockedOnBuffer();
+    // The range will be scheduled when buffers are added to it.
+    AddRangeToDisk(lock, range, ScheduleMode::BY_CALLER);
+  } else if (buffer_tag == ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
+    DCHECK(range->HasSubRanges());
+    AddRangeToDisk(lock, range, ScheduleMode::IMMEDIATELY);
+  }
+  return Status::OK();
+}
+
 Status RequestContext::AddWriteRange(WriteRange* write_range) {
   unique_lock<mutex> lock(lock_);
   if (state_ == RequestContext::Cancelled) return CONTEXT_CANCELLED;

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/request-context.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h
index 0fcbca3..24b1bb5 100644
--- a/be/src/runtime/io/request-context.h
+++ b/be/src/runtime/io/request-context.h
@@ -281,6 +281,14 @@ class RequestContext {
   void RemoveActiveScanRangeLocked(
       const boost::unique_lock<boost::mutex>& lock, ScanRange* range);
 
+  /// Try to read the scan range from the cache. '*read_succeeded' is set to true if the
+  /// scan range can be found in the cache, otherwise false.
+  /// If '*needs_buffers' is returned as true, the caller must call
+  /// AllocateBuffersForRange() to add buffers for the data to be read into before the
+  /// range can be scheduled.
+  Status TryReadFromCache(const boost::unique_lock<boost::mutex>& lock, ScanRange* range,
+      bool* read_succeeded, bool* needs_buffers);
+
   // Counters are updated by other classes - expose to other io:: classes for convenience.
 
   /// Total bytes read for this reader

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/request-ranges.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index 3bf55d6..1dc451a 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -60,9 +60,6 @@ class BufferDescriptor {
   int64_t len() { return len_; }
   bool eosr() { return eosr_; }
 
-  /// Returns the offset within the scan range that this buffer starts at
-  int64_t scan_range_offset() const { return scan_range_offset_; }
-
  private:
   DISALLOW_COPY_AND_ASSIGN(BufferDescriptor);
   /// This class is tightly coupled with ScanRange. Making them friends is easiest.
@@ -101,8 +98,6 @@ class BufferDescriptor {
   /// true if the current scan range is complete
   bool eosr_ = false;
 
-  int64_t scan_range_offset_ = 0;
-
   // Handle to an allocated buffer and the client used to allocate it buffer. Only used
   // for non-external buffers.
   BufferPool::ClientHandle* bp_client_ = nullptr;
@@ -191,6 +186,15 @@ struct BufferOpts {
     return BufferOpts(false, NEVER_CACHE, client_buffer, client_buffer_len);
   }
 
+  /// Use only when you don't want to to read the entire scan range, but only sub-ranges
+  /// in it. In this case you can copy the relevant parts from the HDFS cache into the
+  /// client buffer. The length of the buffer, 'client_buffer_len' must fit the
+  /// concatenation of all the sub-ranges.
+  static BufferOpts ReadInto(bool try_cache, int64_t mtime, uint8_t* client_buffer,
+      int64_t client_buffer_len) {
+    return BufferOpts(try_cache, mtime, client_buffer, client_buffer_len);
+  }
+
  private:
   friend class ScanRange;
   friend class HdfsFileReader;
@@ -227,6 +231,12 @@ class ScanRange : public RequestRange {
 
   virtual ~ScanRange();
 
+  /// Defines an internal range within this ScanRange.
+  struct SubRange {
+    int64_t offset;
+    int64_t length;
+  };
+
   /// Resets this scan range object with the scan range description. The scan range
   /// is for bytes [offset, offset + len) in 'file' on 'fs' (which is nullptr for the
   /// local filesystem). The scan range must be non-empty and fall within the file bounds
@@ -238,10 +248,17 @@ class ScanRange : public RequestRange {
   void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
       bool expected_local, const BufferOpts& buffer_opts, void* meta_data = nullptr);
 
+  /// Same as above, but it also adds sub-ranges. No need to merge contiguous sub-ranges
+  /// in advance, as this method will do the merge.
+  void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
+      bool expected_local, const BufferOpts& buffer_opts,
+      std::vector<SubRange>&& sub_ranges, void* meta_data = nullptr);
+
   void* meta_data() const { return meta_data_; }
   bool try_cache() const { return try_cache_; }
   bool read_in_flight() const { return read_in_flight_; }
   bool expected_local() const { return expected_local_; }
+  int64_t bytes_to_read() const { return bytes_to_read_; }
 
   /// Returns the next buffer for this scan range. buffer is an output parameter.
   /// This function blocks until a buffer is ready or an error occurred. If this is
@@ -269,7 +286,7 @@ class ScanRange : public RequestRange {
 
   int64_t mtime() const { return mtime_; }
 
-  int BytesRead() const;
+  bool HasSubRanges() const { return !sub_ranges_.empty(); }
 
  private:
   DISALLOW_COPY_AND_ASSIGN(ScanRange);
@@ -279,6 +296,7 @@ class ScanRange : public RequestRange {
   friend class BufferDescriptor;
   friend class DiskQueue;
   friend class DiskIoMgr;
+  friend class DiskIoMgrTest;
   friend class RequestContext;
   friend class HdfsFileReader;
   friend class LocalFileReader;
@@ -344,6 +362,9 @@ class ScanRange : public RequestRange {
   /// while any thread is inside a critical section.
   Status cancel_status_;
 
+  /// Only for testing
+  void SetFileReader(std::unique_ptr<FileReader> file_reader);
+
   /// END: private members that are accessed by other io:: classes
   /////////////////////////////////////////
 
@@ -383,10 +404,28 @@ class ScanRange : public RequestRange {
     return !cancel_status_.ok() || (eosr_queued_ && ready_buffers_.empty());
   }
 
+  /// Adds sub-ranges to this ScanRange. If sub_ranges is not empty, then ScanRange won't
+  /// read everything from its range, but will only read these sub-ranges.
+  /// Sub-ranges need to be ordered by 'offset' and cannot overlap with each other.
+  /// Doesn't need to merge continuous sub-ranges in advance, this method will do.
+  void InitSubRanges(std::vector<SubRange>&& sub_ranges);
+
+  /// Read the sub-ranges into buffer and track the current position in 'sub_range_pos_'.
+  /// If cached data is available, then memcpy() from it instead of actually reading the
+  /// files.
+  Status ReadSubRanges(BufferDescriptor* buffer, bool* eof);
+
   /// Validates the internal state of this range. lock_ must be taken
   /// before calling this.
   bool Validate();
 
+  /// Validates the sub-ranges. All sub-range must be inside of this ScanRange.
+  /// They need to be ordered by offset and cannot overlap.
+  bool ValidateSubRanges();
+
+  /// Merges adjacent and continuous sub-ranges.
+  void MergeSubRanges();
+
   /// Pointer to caller specified metadata. This is untouched by the io manager
   /// and the caller can put whatever auxiliary data in here.
   void* meta_data_ = nullptr;
@@ -418,18 +457,26 @@ class ScanRange : public RequestRange {
   /// Valid if the 'external_buffer_tag_' is CLIENT_BUFFER.
   struct {
     /// Client-provided buffer to read the whole scan range into.
-    uint8_t* data;
+    uint8_t* data = nullptr;
 
     /// Length of the client-provided buffer.
-    int64_t len;
+    int64_t len = 0;
   } client_buffer_;
 
+  /// Valid if reading file contents from cache was successful.
+  struct {
+    /// Pointer to the contents of the file.
+    uint8_t* data = nullptr;
+    /// Length of the contents.
+    int64_t len = 0;
+  } cache_;
+
   /// The number of buffers that have been returned to a client via GetNext() that have
   /// not yet been returned with ReturnBuffer().
   AtomicInt32 num_buffers_in_reader_{0};
 
   /// Lock protecting fields below.
-  /// This lock should not be taken during Open()/Read()/Close().
+  /// This lock should not be taken during FileReader::Open()/Read()/Close().
   /// If RequestContext::lock_ and this lock need to be held simultaneously,
   /// RequestContext::lock_ must be taken first.
   boost::mutex lock_;
@@ -475,8 +522,30 @@ class ScanRange : public RequestRange {
   /// cancelled.
   ConditionVariable buffer_ready_cv_;
 
+  /// Number of bytes read by this scan range.
+  int64_t bytes_read_ = 0;
+
   /// Polymorphic object that is responsible for doing file operations.
   std::unique_ptr<FileReader> file_reader_;
+
+  /// If not empty, the ScanRange will only read these parts from the file.
+  std::vector<SubRange> sub_ranges_;
+
+  // Read position in the sub-ranges.
+  struct SubRangePosition {
+    /// Index of SubRange in 'ScanRange::sub_ranges_' to read next
+    int64_t index = 0;
+    /// Bytes already read from 'ScanRange::sub_ranges_[sub_range_index]'
+    int64_t bytes_read = 0;
+  };
+
+  /// Current read position in the sub-ranges.
+  SubRangePosition sub_range_pos_;
+
+  /// Number of bytes need to be read by this ScanRange. If there are no sub-ranges it
+  /// equals to 'len_'. If there are sub-ranges then it equals to the sum of the lengths
+  /// of the sub-ranges (which is less than or equal to 'len_').
+  int64_t bytes_to_read_ = 0;
 };
 
 /// Used to specify data to be written to a file and offset.

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index aaef6b4..660710e 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -163,13 +163,8 @@ unique_ptr<BufferDescriptor> ScanRange::GetUnusedBuffer(
   return result;
 }
 
-int ScanRange::BytesRead() const {
-  DCHECK(file_reader_ != nullptr);
-  return file_reader_->bytes_read();
-}
-
 ReadOutcome ScanRange::DoRead(int disk_id) {
-  int64_t bytes_remaining = len_ - BytesRead();
+  int64_t bytes_remaining = bytes_to_read_ - bytes_read_;
   DCHECK_GT(bytes_remaining, 0);
 
   unique_ptr<BufferDescriptor> buffer_desc;
@@ -183,8 +178,8 @@ ReadOutcome ScanRange::DoRead(int disk_id) {
           this, client_buffer_.data, client_buffer_.len));
     } else {
       DCHECK(external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER)
-          << "This code path does not handle other buffer types, i.e. HDFS cache"
-          << static_cast<int>(external_buffer_tag_);
+          << "This code path does not handle other buffer types, i.e. HDFS cache. "
+          << "external_buffer_tag_=" << static_cast<int>(external_buffer_tag_);
       buffer_desc = GetUnusedBuffer(lock);
       if (buffer_desc == nullptr) {
         // No buffer available - the range will be rescheduled when a buffer is added.
@@ -199,21 +194,27 @@ ReadOutcome ScanRange::DoRead(int disk_id) {
   // No locks in this section.  Only working on local vars.  We don't want to hold a
   // lock across the read call.
   Status read_status = file_reader_->Open(is_file_handle_caching_enabled());
+  bool eof = false;
   if (read_status.ok()) {
     COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, 1L);
     COUNTER_BITOR_IF_NOT_NULL(reader_->disks_accessed_bitmap_, 1LL << disk_id);
 
-    read_status = file_reader_->ReadFromPos(offset_ + BytesRead(), buffer_desc->buffer_,
-        min(len() - BytesRead(), buffer_desc->buffer_len_), &buffer_desc->len_,
-        &buffer_desc->eosr_);
-    buffer_desc->scan_range_offset_ = BytesRead() - buffer_desc->len_;
+    if (sub_ranges_.empty()) {
+      DCHECK(cache_.data == nullptr);
+      read_status = file_reader_->ReadFromPos(offset_ + bytes_read_, buffer_desc->buffer_,
+          min(len() - bytes_read_, buffer_desc->buffer_len_),
+          &buffer_desc->len_, &eof);
+    } else {
+      read_status = ReadSubRanges(buffer_desc.get(), &eof);
+    }
 
     COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, buffer_desc->len_);
     COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, -1L);
   }
 
   DCHECK(buffer_desc->buffer_ != nullptr);
-  DCHECK(!buffer_desc->is_cached()) << "HDFS cache reads don't go through this code path.";
+  DCHECK(!buffer_desc->is_cached()) <<
+      "Pure HDFS cache reads don't go through this code path.";
   if (!read_status.ok()) {
     // Free buffer to release resources before we cancel the range so that all buffers
     // are freed at cancellation.
@@ -228,9 +229,15 @@ ReadOutcome ScanRange::DoRead(int disk_id) {
     return ReadOutcome::CANCELLED;
   }
 
+  bytes_read_ += buffer_desc->len();
+  DCHECK_LE(bytes_read_, bytes_to_read_);
+
+  // It is end of stream if it is end of file, or read all the bytes.
+  buffer_desc->eosr_ = eof || bytes_read_ == bytes_to_read_;
+
   // After calling EnqueueReadyBuffer(), it is no longer valid to touch 'buffer_desc'.
   // Store the state we need before calling EnqueueReadyBuffer().
-  bool eosr = buffer_desc->eosr_;
+  bool eosr = buffer_desc->eosr();
   // Read successful - enqueue the buffer and return the appropriate outcome.
   if (!EnqueueReadyBuffer(move(buffer_desc))) return ReadOutcome::CANCELLED;
   if (eosr) {
@@ -241,6 +248,42 @@ ReadOutcome ScanRange::DoRead(int disk_id) {
   return ReadOutcome::SUCCESS_NO_EOSR;
 }
 
+Status ScanRange::ReadSubRanges(BufferDescriptor* buffer_desc, bool* eof) {
+  buffer_desc->len_ = 0;
+  while (buffer_desc->len() < buffer_desc->buffer_len() &&
+      sub_range_pos_.index < sub_ranges_.size()) {
+    SubRange& sub_range = sub_ranges_[sub_range_pos_.index];
+    int64_t offset = sub_range.offset + sub_range_pos_.bytes_read;
+    int64_t bytes_to_read = min(sub_range.length - sub_range_pos_.bytes_read,
+        buffer_desc->buffer_len() - buffer_desc->len());
+
+    if (cache_.data != nullptr) {
+      memcpy(buffer_desc->buffer_ + buffer_desc->len(),
+          cache_.data + offset, bytes_to_read);
+    } else {
+      int64_t current_bytes_read;
+      Status read_status = file_reader_->ReadFromPos(offset,
+          buffer_desc->buffer_ + buffer_desc->len(), bytes_to_read, &current_bytes_read,
+          eof);
+      if (!read_status.ok()) return read_status;
+      if (current_bytes_read != bytes_to_read) {
+        DCHECK(*eof);
+        DCHECK_LT(current_bytes_read, bytes_to_read);
+        return Status(TErrorCode::SCANNER_INCOMPLETE_READ, bytes_to_read,
+            current_bytes_read, file(), offset);
+      }
+    }
+
+    buffer_desc->len_ += bytes_to_read;
+    sub_range_pos_.bytes_read += bytes_to_read;
+    if (sub_range_pos_.bytes_read == sub_range.length) {
+      sub_range_pos_.index += 1;
+      sub_range_pos_.bytes_read = 0;
+    }
+  }
+  return Status::OK();
+}
+
 void ScanRange::SetBlockedOnBuffer() {
   unique_lock<mutex> lock(lock_);
   blocked_on_buffer_ = true;
@@ -339,9 +382,10 @@ string ScanRange::DebugString() const {
 }
 
 bool ScanRange::Validate() {
-  if (BytesRead() > len_) {
-    LOG(ERROR) << "Bytes read tracking is wrong. Shouldn't read past the scan range."
-                 << " bytes_read_=" << BytesRead() << " len_=" << len_;
+  if (bytes_read_ > bytes_to_read_) {
+    LOG(ERROR) << "Bytes read tracking is wrong. Too many bytes have been read."
+                 << " bytes_read_=" << bytes_read_
+                 << " bytes_to_read_=" << bytes_to_read_;
     return false;
   }
   if (!cancel_status_.ok() && !ready_buffers_.empty()) {
@@ -377,8 +421,6 @@ ScanRange::ScanRange()
     external_buffer_tag_(ExternalBufferTag::NO_BUFFER) {}
 
 ScanRange::~ScanRange() {
-  DCHECK(external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER)
-      << "Cached buffer was not released.";
   DCHECK(!read_in_flight_);
   DCHECK_EQ(0, ready_buffers_.size());
   DCHECK_EQ(0, num_buffers_in_reader_.Load());
@@ -386,6 +428,12 @@ ScanRange::~ScanRange() {
 
 void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
     int disk_id, bool expected_local, const BufferOpts& buffer_opts, void* meta_data) {
+  Reset(fs, file, len, offset, disk_id, expected_local, buffer_opts, {}, meta_data);
+}
+
+void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
+    int disk_id, bool expected_local, const BufferOpts& buffer_opts,
+    vector<SubRange>&& sub_ranges, void* meta_data) {
   DCHECK(ready_buffers_.empty());
   DCHECK(!read_in_flight_);
   DCHECK(file != nullptr);
@@ -401,6 +449,7 @@ void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
   }
   file_ = file;
   len_ = len;
+  bytes_to_read_ = len;
   offset_ = offset;
   disk_id_ = disk_id;
   try_cache_ = buffer_opts.try_cache_;
@@ -416,6 +465,59 @@ void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
   expected_local_ = expected_local;
   io_mgr_ = nullptr;
   reader_ = nullptr;
+  sub_ranges_.clear();
+  sub_range_pos_ = {};
+  InitSubRanges(move(sub_ranges));
+}
+
+void ScanRange::InitSubRanges(vector<SubRange>&& sub_ranges) {
+  sub_ranges_ = std::move(sub_ranges);
+  DCHECK(ValidateSubRanges());
+  MergeSubRanges();
+  DCHECK(ValidateSubRanges());
+  sub_range_pos_ = {};
+
+  if (sub_ranges_.empty()) return;
+
+  int length_sum = 0;
+  for (auto& sub_range : sub_ranges_) {
+    length_sum += sub_range.length;
+  }
+  bytes_to_read_ = length_sum;
+}
+
+bool ScanRange::ValidateSubRanges() {
+  for (int i = 0; i < sub_ranges_.size(); ++i) {
+    SubRange& sub_range = sub_ranges_[i];
+    if (sub_range.length <= 0) return false;
+    if (sub_range.offset < offset_) return false;
+    if (sub_range.offset + sub_range.length > offset_ + len_) return false;
+
+    if (i == sub_ranges_.size() - 1) break;
+
+    SubRange& next_sub_range = sub_ranges_[i+1];
+    if (sub_range.offset + sub_range.length > next_sub_range.offset) return false;
+  }
+  return true;
+}
+
+void ScanRange::MergeSubRanges() {
+  if (sub_ranges_.empty()) return;
+  for (int i = 0; i < sub_ranges_.size() - 1; ++i) {
+    SubRange& current = sub_ranges_[i];
+    int j = i + 1;
+    for (; j < sub_ranges_.size(); ++j) {
+      SubRange& sr_j = sub_ranges_[j];
+      if (sr_j.offset == current.offset + current.length) {
+        current.length += sr_j.length;
+      } else {
+        break;
+      }
+    }
+    if (j > i + 1) {
+      sub_ranges_.erase(sub_ranges_.begin() + i + 1, sub_ranges_.begin() + j);
+    }
+  }
 }
 
 void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) {
@@ -427,10 +529,16 @@ void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) {
   cancel_status_ = Status::OK();
   eosr_queued_ = false;
   blocked_on_buffer_ = false;
+  bytes_read_ = 0;
+  sub_range_pos_ = {};
   file_reader_->ResetState();
   DCHECK(Validate()) << DebugString();
 }
 
+void ScanRange::SetFileReader(unique_ptr<FileReader> file_reader) {
+  file_reader_ = move(file_reader);
+}
+
 int64_t ScanRange::MaxReadChunkSize() const {
   // S3 InputStreams don't support DIRECT_READ (i.e. java.nio.ByteBuffer read()
   // interface).  So, hdfsRead() needs to allocate a Java byte[] and copy the data out.
@@ -454,54 +562,54 @@ Status ScanRange::ReadFromCache(
     const unique_lock<mutex>& reader_lock, bool* read_succeeded) {
   DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
   DCHECK(try_cache_);
-  DCHECK_EQ(BytesRead(), 0);
+  DCHECK_EQ(bytes_read_, 0);
   *read_succeeded = false;
   Status status = file_reader_->Open(false);
   if (!status.ok()) return status;
 
-  // Cached reads not supported on local filesystem.
-  if (fs_ == nullptr) return Status::OK();
-
   // Check cancel status.
   {
     unique_lock<mutex> lock(lock_);
     RETURN_IF_ERROR(cancel_status_);
   }
 
-  DCHECK(external_buffer_tag_ == ExternalBufferTag::NO_BUFFER);
-  void* buffer = file_reader_->CachedFile();
-  if (buffer != nullptr) external_buffer_tag_ = ExternalBufferTag::CACHED_BUFFER;
+  file_reader_->CachedFile(&cache_.data, &cache_.len);
   // Data was not cached, caller will fall back to normal read path.
-  if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) {
+  if (cache_.data == nullptr) {
     VLOG_QUERY << "Cache read failed for scan range: " << DebugString()
                << ". Switching to disk read path.";
     // Clean up the scan range state before re-issuing it.
     file_reader_->Close();
     return Status::OK();
   }
-  int bytes_read = BytesRead();
   // A partial read can happen when files are truncated.
   // TODO: If HDFS ever supports partially cached blocks, we'll have to distinguish
   // between errors and partially cached blocks here.
-  if (bytes_read < len()) {
+  if (cache_.len < len()) {
     VLOG_QUERY << "Error reading file from HDFS cache: " << file_ << ". Expected "
-      << len() << " bytes, but read " << bytes_read << ". Switching to disk read path.";
+      << len() << " bytes, but read " << cache_.len << ". Switching to disk read path.";
     // Close the scan range. 'read_succeeded' is still false, so the caller will fall back
     // to non-cached read of this scan range.
     file_reader_->Close();
     return Status::OK();
   }
 
+  *read_succeeded = true;
+  // If there are sub-ranges, then we need to memcpy() them from the cached buffer.
+  if (HasSubRanges()) return Status::OK();
+
+  DCHECK(external_buffer_tag_ != ExternalBufferTag::CLIENT_BUFFER);
+  external_buffer_tag_ = ExternalBufferTag::CACHED_BUFFER;
+  bytes_read_ = cache_.len;
+
   // Create a single buffer desc for the entire scan range and enqueue that.
   // The memory is owned by the HDFS java client, not the Impala backend.
   unique_ptr<BufferDescriptor> desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(
-      this, reinterpret_cast<uint8_t*>(buffer), 0));
-  desc->len_ = bytes_read;
-  desc->scan_range_offset_ = 0;
+      this, cache_.data, 0));
+  desc->len_ = cache_.len;
   desc->eosr_ = true;
   EnqueueReadyBuffer(move(desc));
-  COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, bytes_read);
-  *read_succeeded = true;
+  COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, cache_.len);
   return Status::OK();
 }
 


[2/4] impala git commit: IMPALA-7555: Set socket timeout in impala-shell

Posted by mi...@apache.org.
IMPALA-7555: Set socket timeout in impala-shell

impala-shell does not set any socket timeout while connecting to the
impala server. This change sets a timeout on the socket before
connecting and unsets it back after successfully connecting. The default
timeout on this socket is 5 sec.
Usage: impala-shell --client_connect_timeout=<value in ms>

Testing:
1. Added a test where I create a random listening socket.
impala-shell (with ssl enabled) connects to this socket and
times out after 2 sec.

2. Created a kerberized impala cluster with ssl enabled and
connected to the impalad using an openssl client (block the
beeswax server thread to accept new connection) -

E.g. - openssl s_client -connect <IP Addr>:21000
Used impala-shell to connect to the same impalad later.
impala-shell timed out after the default of 5 sec.I verified
it manually.

Change-Id: I130fc47f7a83f591918d6842634b4e5787d00813
Reviewed-on: http://gerrit.cloudera.org:8080/11540
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2fb8ebae
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2fb8ebae
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2fb8ebae

Branch: refs/heads/master
Commit: 2fb8ebaef2beecd511e963fadbb41cbb11add138
Parents: 5af5456
Author: aphadke <ap...@cloudera.com>
Authored: Thu Sep 20 16:51:23 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Oct 18 01:41:42 2018 +0000

----------------------------------------------------------------------
 shell/impala_client.py                | 29 +++++++++++++++++++++++------
 shell/impala_shell.py                 | 17 +++++++++--------
 shell/impala_shell_config_defaults.py |  1 +
 shell/option_parser.py                |  5 ++++-
 tests/shell/test_shell_commandline.py | 15 +++++++++++++++
 5 files changed, 52 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/2fb8ebae/shell/impala_client.py
----------------------------------------------------------------------
diff --git a/shell/impala_client.py b/shell/impala_client.py
index 2f2a5e9..e53637a 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -18,6 +18,7 @@
 # under the License.
 
 import sasl
+import sys
 import time
 
 from beeswaxd import BeeswaxService
@@ -57,11 +58,16 @@ class DisconnectedException(Exception):
 
 class QueryCancelledByShellException(Exception): pass
 
+
+def print_to_stderr(message):
+  print >> sys.stderr, message
+
 class ImpalaClient(object):
 
   def __init__(self, impalad, kerberos_host_fqdn, use_kerberos=False,
                kerberos_service_name="impala", use_ssl=False, ca_cert=None, user=None,
-               ldap_password=None, use_ldap=False):
+               ldap_password=None, use_ldap=False, client_connect_timeout_ms=5000,
+               verbose=True):
     self.connected = False
     self.impalad_host = impalad[0].encode('ascii', 'ignore')
     self.impalad_port = int(impalad[1])
@@ -74,6 +80,7 @@ class ImpalaClient(object):
     self.ca_cert = ca_cert
     self.user, self.ldap_password = user, ldap_password
     self.use_ldap = use_ldap
+    self.client_connect_timeout_ms = int(client_connect_timeout_ms)
     self.default_query_options = {}
     self.query_option_levels = {}
     self.query_state = QueryState._NAMES_TO_VALUES
@@ -82,6 +89,7 @@ class ImpalaClient(object):
     # from command line via CTRL+C. It is used to suppress error messages of
     # query cancellation.
     self.is_query_cancelled = False
+    self.verbose = verbose
 
   def _options_to_string_list(self, set_query_options):
     return ["%s=%s" % (k, v) for (k, v) in set_query_options.iteritems()]
@@ -250,8 +258,15 @@ class ImpalaClient(object):
       self.transport = None
 
     self.connected = False
-    self.transport = self._get_transport()
+    sock, self.transport = self._get_socket_and_transport()
+    if self.client_connect_timeout_ms > 0:
+      sock.setTimeout(self.client_connect_timeout_ms)
     self.transport.open()
+    if self.verbose:
+      print_to_stderr('Opened TCP connection to %s:%s' % (self.impalad_host,
+          self.impalad_port))
+    # Setting a timeout of None disables timeouts on sockets
+    sock.setTimeout(None)
     protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
     self.imp_service = ImpalaService.Client(protocol)
     result = self.ping_impala_service()
@@ -266,7 +281,7 @@ class ImpalaClient(object):
     if self.transport:
       self.transport.close()
 
-  def _get_transport(self):
+  def _get_socket_and_transport(self):
     """Create a Transport.
 
        A non-kerberized impalad just needs a simple buffered transport. For
@@ -274,6 +289,7 @@ class ImpalaClient(object):
 
        If SSL is enabled, a TSSLSocket underlies the transport stack; otherwise a TSocket
        is used.
+       This function returns the socket and the transport object.
     """
     if self.use_ssl:
       # TSSLSocket needs the ssl module, which may not be standard on all Operating
@@ -304,7 +320,8 @@ class ImpalaClient(object):
     else:
       sock = TSocket(sock_host, sock_port)
     if not (self.use_ldap or self.use_kerberos):
-      return TBufferedTransport(sock)
+      return sock, TBufferedTransport(sock)
+
     # Initializes a sasl client
     def sasl_factory():
       sasl_client = sasl.Client()
@@ -318,9 +335,9 @@ class ImpalaClient(object):
       return sasl_client
     # GSSASPI is the underlying mechanism used by kerberos to authenticate.
     if self.use_kerberos:
-      return TSaslClientTransport(sasl_factory, "GSSAPI", sock)
+      return sock, TSaslClientTransport(sasl_factory, "GSSAPI", sock)
     else:
-      return TSaslClientTransport(sasl_factory, "PLAIN", sock)
+      return sock, TSaslClientTransport(sasl_factory, "PLAIN", sock)
 
   def create_beeswax_query(self, query_str, set_query_options):
     """Create a beeswax query object from a query string"""

http://git-wip-us.apache.org/repos/asf/impala/blob/2fb8ebae/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index a7b1ac8..9277071 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -168,7 +168,7 @@ class ImpalaShell(object, cmd.Cmd):
     self.ldap_password = options.ldap_password
     self.ldap_password_cmd = options.ldap_password_cmd
     self.use_ldap = options.use_ldap
-
+    self.client_connect_timeout_ms = options.client_connect_timeout_ms
     self.verbose = options.verbose
     self.prompt = ImpalaShell.DISCONNECTED_PROMPT
     self.server_version = ImpalaShell.UNKNOWN_SERVER_VERSION
@@ -518,7 +518,7 @@ class ImpalaShell(object, cmd.Cmd):
     return ImpalaClient(self.impalad, self.kerberos_host_fqdn, self.use_kerberos,
                         self.kerberos_service_name, self.use_ssl,
                         self.ca_cert, self.user, self.ldap_password,
-                        self.use_ldap)
+                        self.use_ldap, self.client_connect_timeout_ms, self.verbose)
 
   def _signal_handler(self, signal, frame):
     """Handles query cancellation on a Ctrl+C event"""
@@ -818,12 +818,13 @@ class ImpalaShell(object, cmd.Cmd):
       print_to_stderr("Unable to import the python 'ssl' module. It is"
       " required for an SSL-secured connection.")
       sys.exit(1)
-    except socket.error, (code, e):
+    except socket.error, e:
       # if the socket was interrupted, reconnect the connection with the client
-      if code == errno.EINTR:
+      if e.errno == errno.EINTR:
         self._reconnect_cancellation()
       else:
-        print_to_stderr("Socket error %s: %s" % (code, e))
+        print_to_stderr("Socket error %s: %s" % (e.errno, e))
+        self.imp_client.close_connection()
         self.prompt = self.DISCONNECTED_PROMPT
     except Exception, e:
       if self.ldap_password_cmd and \
@@ -1507,7 +1508,7 @@ def parse_variables(keyvals):
 
 def replace_variables(set_variables, string):
   """Replaces variable within the string with their corresponding values using the
-  given set_variables."""
+     given set_variables."""
   errors = False
   matches = set(map(lambda v: v.upper(), re.findall(r'(?<!\\)\${([^}]+)}', string)))
   for name in matches:
@@ -1537,8 +1538,8 @@ def replace_variables(set_variables, string):
 
 
 def get_var_name(name):
-  """Look for a namespace:var_name pattern in an option name.
-     Return the variable name if it's a match or None otherwise.
+  """Looks for a namespace:var_name pattern in an option name.
+     Returns the variable name if it's a match or None otherwise.
   """
   ns_match = re.match(r'^([^:]*):(.*)', name)
   if ns_match is not None:

http://git-wip-us.apache.org/repos/asf/impala/blob/2fb8ebae/shell/impala_shell_config_defaults.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell_config_defaults.py b/shell/impala_shell_config_defaults.py
index 260e93e..be7685b 100644
--- a/shell/impala_shell_config_defaults.py
+++ b/shell/impala_shell_config_defaults.py
@@ -51,4 +51,5 @@ impala_shell_defaults = {
             'verbose': True,
             'version': False,
             'write_delimited': False,
+            'client_connect_timeout_ms': 5000,
             }

http://git-wip-us.apache.org/repos/asf/impala/blob/2fb8ebae/shell/option_parser.py
----------------------------------------------------------------------
diff --git a/shell/option_parser.py b/shell/option_parser.py
index 000e319..ccae53b 100755
--- a/shell/option_parser.py
+++ b/shell/option_parser.py
@@ -221,7 +221,10 @@ def get_option_parser(defaults):
                          " It must follow the pattern \"KEY=VALUE\","
                          " KEY must be a valid query option. Valid query options "
                          " can be listed by command 'set'.")
-
+  parser.add_option("-t", "--client_connect_timeout_ms",
+                    help="Timeout in milliseconds after which impala-shell will time out"
+                    " if it fails to connect to Impala server. Set to 0 to disable any"
+                    " timeout.")
   # add default values to the help text
   for option in parser.option_list:
     # since the quiet flag is the same as the verbose flag

http://git-wip-us.apache.org/repos/asf/impala/blob/2fb8ebae/tests/shell/test_shell_commandline.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py
index b74fbc2..fd18230 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -32,6 +32,7 @@ from tests.common.skip import SkipIf
 from time import sleep, time
 from util import IMPALAD, SHELL_CMD
 from util import assert_var_substitution, run_impala_shell_cmd, ImpalaShell
+from contextlib import closing
 
 DEFAULT_QUERY = 'select 1'
 QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 'tests', 'shell')
@@ -757,3 +758,17 @@ class TestImpalaShell(ImpalaTestSuite):
       find_query_option("duplicate", test_input)
     with pytest.raises(AssertionError):
       find_query_option("not_an_option", test_input)
+
+  def test_impala_shell_timeout(self):
+    """Tests that impala shell times out during connect.
+       This creates a random listening socket and we try to connect to this
+       socket through the impala-shell. The impala-shell should timeout and not hang
+       indefinitely while connecting
+    """
+    with closing(socket.socket()) as s:
+      s.bind(("", 0))
+      # maximum number of queued connections on this socket is 1.
+      s.listen(1)
+      test_port = s.getsockname()[1]
+      args = '-q "select foo; select bar;" --ssl -t 2000 -i localhost:%d' % (test_port)
+      run_impala_shell_cmd(args, expect_success=False)


[4/4] impala git commit: IMPALA-7272: Fix crash in StringMinMaxFilter

Posted by mi...@apache.org.
IMPALA-7272: Fix crash in StringMinMaxFilter

StringMinMaxFilter uses a MemPool to allocate space for StringBuffers.
Previously, the MemPool was created by RuntimeFilterBank and passed to
each StringMinMaxFilter. In queries with multiple StringMinMaxFilters
being generated by the same fragment instance, this leads to
overlapping use of the MemPool by different threads, which is
incorrect as MemPools are not thread-safe.

The solution is to have each StringMinMaxFilter create its own
MemPool.

This patch also documents MemPool as not thread-safe and introduces a
DFAKE_MUTEX to help enforce correct usage. Doing this requires
modifying our CMakeLists.txt to pass '-DNDEBUG' to clang only in
RELEASE builds, so that the DFAKE_MUTEX will be present in the
compiled IR for DEBUG builds.

Testing:
- I have been unable to repro the actual crash despite trying a large
  variety of different things. However, with additional logging added
  its clear that the MemPool is being used concurrently, which is
  incorrect.
- Added an e2e test that covers the potential issue. It hits the
  DFAKE_MUTEX with a sleep added to MemPool::Allocate.
- Ran a full exhaustive build in both DEBUG and RELEASE.

Change-Id: I751cad7e6b75c9d95d7ad029bbd1e52fe09e8a29
Reviewed-on: http://gerrit.cloudera.org:8080/11650
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/9f5c5e6d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/9f5c5e6d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/9f5c5e6d

Branch: refs/heads/master
Commit: 9f5c5e6df03824cba292fe5a619153462c11669c
Parents: 48fb490
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Wed Oct 10 11:43:50 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Oct 18 20:56:52 2018 +0000

----------------------------------------------------------------------
 be/CMakeLists.txt                               |  9 ++--
 be/src/runtime/mem-pool.cc                      |  5 ++
 be/src/runtime/mem-pool.h                       | 11 +++++
 be/src/runtime/runtime-filter-bank.cc           | 11 +++--
 be/src/runtime/runtime-filter-bank.h            |  9 ++--
 be/src/util/min-max-filter-test.cc              | 49 ++++++++++++--------
 be/src/util/min-max-filter.cc                   | 18 ++++---
 be/src/util/min-max-filter.h                    | 26 +++++++----
 .../queries/QueryTest/min_max_filters.test      | 24 ++++++++++
 9 files changed, 113 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/9f5c5e6d/be/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 2188294..99cbf81 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -222,15 +222,16 @@ add_definitions(-DKUDU_HEADERS_USE_SHORT_STATUS_MACROS -DKUDU_HEADERS_USE_RICH_S
 #  -Wno-return-type-c-linkage: UDFs return C++ classes but use C linkage to prevent
 #       mangling
 #  -DBOOST_NO_EXCEPTIONS: call a custom error handler for exceptions in codegen'd code.
-set(CLANG_IR_CXX_FLAGS "-emit-llvm" "-c" "-std=c++14" "-DIR_COMPILE" "-DNDEBUG"
-  "-DHAVE_INTTYPES_H" "-DHAVE_NETINET_IN_H" "-DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG"
-  "-DBOOST_NO_EXCEPTIONS" "-fcolor-diagnostics" "-Wno-deprecated"
-  "-Wno-return-type-c-linkage" "-O1")
+set(CLANG_IR_CXX_FLAGS "-emit-llvm" "-c" "-std=c++14" "-DIR_COMPILE" "-DHAVE_INTTYPES_H"
+  "-DHAVE_NETINET_IN_H" "-DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG" "-DBOOST_NO_EXCEPTIONS"
+  "-fcolor-diagnostics" "-Wno-deprecated" "-Wno-return-type-c-linkage" "-O1")
 # -Werror: compile warnings should be errors when using the toolchain compiler.
 set(CLANG_IR_CXX_FLAGS "${CLANG_IR_CXX_FLAGS}" "-Werror")
 
 if ("${CMAKE_BUILD_TYPE}" STREQUAL "ADDRESS_SANITIZER")
   SET(CLANG_IR_CXX_FLAGS "${CLANG_IR_CXX_FLAGS}" "-DADDRESS_SANITIZER")
+elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "RELEASE")
+  SET(CLANG_IR_CXX_FLAGS "${CLANG_IR_CXX_FLAGS}" "-DNDEBUG")
 endif()
 
 if (UBSAN_CODEGEN)

http://git-wip-us.apache.org/repos/asf/impala/blob/9f5c5e6d/be/src/runtime/mem-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool.cc b/be/src/runtime/mem-pool.cc
index 4d00d82..c0aa438 100644
--- a/be/src/runtime/mem-pool.cc
+++ b/be/src/runtime/mem-pool.cc
@@ -72,6 +72,7 @@ MemPool::~MemPool() {
 }
 
 void MemPool::Clear() {
+  DFAKE_SCOPED_LOCK(mutex_);
   current_chunk_idx_ = -1;
   for (auto& chunk: chunks_) {
     chunk.allocated_bytes = 0;
@@ -82,6 +83,7 @@ void MemPool::Clear() {
 }
 
 void MemPool::FreeAll() {
+  DFAKE_SCOPED_LOCK(mutex_);
   int64_t total_bytes_released = 0;
   for (auto& chunk: chunks_) {
     total_bytes_released += chunk.size;
@@ -162,6 +164,7 @@ bool MemPool::FindChunk(int64_t min_size, bool check_limits) noexcept {
 }
 
 void MemPool::AcquireData(MemPool* src, bool keep_current) {
+  DFAKE_SCOPED_LOCK(mutex_);
   DCHECK(src->CheckIntegrity(false));
   int num_acquired_chunks;
   if (keep_current) {
@@ -213,6 +216,7 @@ void MemPool::AcquireData(MemPool* src, bool keep_current) {
 }
 
 void MemPool::SetMemTracker(MemTracker* new_tracker) {
+  DFAKE_SCOPED_LOCK(mutex_);
   mem_tracker_->TransferTo(new_tracker, total_reserved_bytes_);
   mem_tracker_ = new_tracker;
 }
@@ -236,6 +240,7 @@ string MemPool::DebugString() {
 }
 
 int64_t MemPool::GetTotalChunkSizes() const {
+  DFAKE_SCOPED_LOCK(mutex_);
   int64_t result = 0;
   for (int i = 0; i < chunks_.size(); ++i) {
     result += chunks_[i].size;

http://git-wip-us.apache.org/repos/asf/impala/blob/9f5c5e6d/be/src/runtime/mem-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool.h b/be/src/runtime/mem-pool.h
index 64a769e..1d2b633 100644
--- a/be/src/runtime/mem-pool.h
+++ b/be/src/runtime/mem-pool.h
@@ -28,6 +28,7 @@
 
 #include "common/logging.h"
 #include "gutil/dynamic_annotations.h"
+#include "gutil/threading/thread_collision_warner.h"
 #include "util/bit-util.h"
 
 namespace impala {
@@ -88,6 +89,8 @@ class MemTracker;
 /// At this point p.total_allocated_bytes_ would be 0.
 /// The one remaining (empty) chunk is released:
 ///    delete p;
+//
+/// This class is not thread-safe. A DFAKE_MUTEX is used to help enforce correct usage.
 
 class MemPool {
  public:
@@ -104,6 +107,7 @@ class MemPool {
   /// of the the current chunk. Creates a new chunk if there aren't any chunks
   /// with enough capacity.
   uint8_t* Allocate(int64_t size) noexcept {
+    DFAKE_SCOPED_LOCK(mutex_);
     return Allocate<false>(size, DEFAULT_ALIGNMENT);
   }
 
@@ -112,12 +116,14 @@ class MemPool {
   /// The caller must handle the NULL case. This should be used for allocations
   /// where the size can be very big to bound the amount by which we exceed mem limits.
   uint8_t* TryAllocate(int64_t size) noexcept {
+    DFAKE_SCOPED_LOCK(mutex_);
     return Allocate<true>(size, DEFAULT_ALIGNMENT);
   }
 
   /// Same as TryAllocate() except a non-default alignment can be specified. It
   /// should be a power-of-two in [1, alignof(std::max_align_t)].
   uint8_t* TryAllocateAligned(int64_t size, int alignment) noexcept {
+    DFAKE_SCOPED_LOCK(mutex_);
     DCHECK_GE(alignment, 1);
     DCHECK_LE(alignment, alignof(std::max_align_t));
     DCHECK_EQ(BitUtil::RoundUpToPowerOfTwo(alignment), alignment);
@@ -126,6 +132,7 @@ class MemPool {
 
   /// Same as TryAllocate() except returned memory is not aligned at all.
   uint8_t* TryAllocateUnaligned(int64_t size) noexcept {
+    DFAKE_SCOPED_LOCK(mutex_);
     // Call templated implementation directly so that it is inlined here and the
     // alignment logic can be optimised out.
     return Allocate<true>(size, 1);
@@ -135,6 +142,7 @@ class MemPool {
   /// only be used to return either all or part of the previous allocation returned
   /// by Allocate().
   void ReturnPartialAllocation(int64_t byte_size) {
+    DFAKE_SCOPED_LOCK(mutex_);
     DCHECK_GE(byte_size, 0);
     DCHECK(current_chunk_idx_ != -1);
     ChunkInfo& info = chunks_[current_chunk_idx_];
@@ -208,6 +216,9 @@ class MemPool {
   /// TryAllocateAligned().
   static uint32_t zero_length_region_ alignas(std::max_align_t);
 
+  /// Ensures a MemPool is not used by two threads concurrently.
+  DFAKE_MUTEX(mutex_);
+
   /// chunk from which we served the last Allocate() call;
   /// always points to the last chunk that contains allocated data;
   /// chunks 0..current_chunk_idx_ - 1 are guaranteed to contain data

http://git-wip-us.apache.org/repos/asf/impala/blob/9f5c5e6d/be/src/runtime/runtime-filter-bank.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index ffb0a22..85c9625 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -50,7 +50,6 @@ RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* s
   : state_(state),
     filter_mem_tracker_(
         new MemTracker(-1, "Runtime Filter Bank", state->instance_mem_tracker(), false)),
-    mem_pool_(filter_mem_tracker_.get()),
     closed_(false),
     total_bloom_filter_mem_required_(total_filter_mem_required) {
   bloom_memory_allocated_ =
@@ -206,7 +205,8 @@ void RuntimeFilterBank::PublishGlobalFilter(const TPublishFilterParams& params)
     DCHECK(it->second->is_min_max_filter());
     DCHECK(params.__isset.min_max_filter);
     min_max_filter = MinMaxFilter::Create(
-        params.min_max_filter, it->second->type(), &obj_pool_, &mem_pool_);
+        params.min_max_filter, it->second->type(), &obj_pool_, filter_mem_tracker_.get());
+    min_max_filters_.push_back(min_max_filter);
   }
   it->second->SetFilter(bloom_filter, min_max_filter);
   state_->runtime_profile()->AddInfoString(
@@ -247,7 +247,10 @@ MinMaxFilter* RuntimeFilterBank::AllocateScratchMinMaxFilter(
   RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
   DCHECK(it != produced_filters_.end()) << "Filter ID " << filter_id << " not registered";
 
-  return MinMaxFilter::Create(type, &obj_pool_, &mem_pool_);
+  MinMaxFilter* min_max_filter =
+      MinMaxFilter::Create(type, &obj_pool_, filter_mem_tracker_.get());
+  min_max_filters_.push_back(min_max_filter);
+  return min_max_filter;
 }
 
 bool RuntimeFilterBank::FpRateTooHigh(int64_t filter_size, int64_t observed_ndv) {
@@ -260,8 +263,8 @@ void RuntimeFilterBank::Close() {
   lock_guard<mutex> l(runtime_filter_lock_);
   closed_ = true;
   for (BloomFilter* filter : bloom_filters_) filter->Close();
+  for (MinMaxFilter* filter : min_max_filters_) filter->Close();
   obj_pool_.Clear();
-  mem_pool_.FreeAll();
   if (buffer_pool_client_.is_registered()) {
     VLOG_FILE << "RuntimeFilterBank (Fragment Id: "
               << PrintId(state_->fragment_instance_id())

http://git-wip-us.apache.org/repos/asf/impala/blob/9f5c5e6d/be/src/runtime/runtime-filter-bank.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.h b/be/src/runtime/runtime-filter-bank.h
index ead7e82..907d6c4 100644
--- a/be/src/runtime/runtime-filter-bank.h
+++ b/be/src/runtime/runtime-filter-bank.h
@@ -152,9 +152,6 @@ class RuntimeFilterBank {
   /// MemTracker to track Bloom filter memory.
   boost::scoped_ptr<MemTracker> filter_mem_tracker_;
 
-  // Mem pool to track allocations made by filters.
-  MemPool mem_pool_;
-
   /// True iff Close() has been called. Used to prevent races between
   /// AllocateScratchBloomFilter() and Close().
   bool closed_;
@@ -166,9 +163,13 @@ class RuntimeFilterBank {
   long total_bloom_filter_mem_required_;
 
   /// Contains references to all the bloom filters generated. Used in Close() to safely
-  /// release all memory allocated for Bloomfilters.
+  /// release all memory allocated for BloomFilters.
   vector<BloomFilter*> bloom_filters_;
 
+  /// Contains references to all the min-max filters generated. Used in Close() to safely
+  /// release all memory allocated for MinMaxFilters.
+  vector<MinMaxFilter*> min_max_filters_;
+
   /// Buffer pool client for the filter bank. Initialized with the required reservation
   /// in ClaimBufferReservation(). Reservations are returned to the initial reservations
   /// pool in Close().

http://git-wip-us.apache.org/repos/asf/impala/blob/9f5c5e6d/be/src/util/min-max-filter-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/min-max-filter-test.cc b/be/src/util/min-max-filter-test.cc
index 23712e6..37d6f83 100644
--- a/be/src/util/min-max-filter-test.cc
+++ b/be/src/util/min-max-filter-test.cc
@@ -31,11 +31,10 @@ using namespace impala;
 // inserted into it, and that MinMaxFilter::Or works for bools.
 TEST(MinMaxFilterTest, TestBoolMinMaxFilter) {
   MemTracker mem_tracker;
-  MemPool mem_pool(&mem_tracker);
   ObjectPool obj_pool;
 
-  MinMaxFilter* filter =
-      MinMaxFilter::Create(ColumnType(PrimitiveType::TYPE_BOOLEAN), &obj_pool, &mem_pool);
+  MinMaxFilter* filter = MinMaxFilter::Create(
+      ColumnType(PrimitiveType::TYPE_BOOLEAN), &obj_pool, &mem_tracker);
   EXPECT_TRUE(filter->AlwaysFalse());
   bool b1 = true;
   filter->Insert(&b1);
@@ -58,6 +57,8 @@ TEST(MinMaxFilterTest, TestBoolMinMaxFilter) {
   MinMaxFilter::Or(tFilter1, &tFilter2);
   EXPECT_FALSE(tFilter2.min.bool_val);
   EXPECT_TRUE(tFilter2.max.bool_val);
+
+  filter->Close();
 }
 
 void CheckIntVals(MinMaxFilter* filter, int32_t min, int32_t max) {
@@ -73,11 +74,10 @@ void CheckIntVals(MinMaxFilter* filter, int32_t min, int32_t max) {
 // generated with maxcros and the logic is identical.
 TEST(MinMaxFilterTest, TestNumericMinMaxFilter) {
   MemTracker mem_tracker;
-  MemPool mem_pool(&mem_tracker);
   ObjectPool obj_pool;
 
   ColumnType int_type(PrimitiveType::TYPE_INT);
-  MinMaxFilter* int_filter = MinMaxFilter::Create(int_type, &obj_pool, &mem_pool);
+  MinMaxFilter* int_filter = MinMaxFilter::Create(int_type, &obj_pool, &mem_tracker);
 
   // Test the behavior of an empty filter.
   EXPECT_TRUE(int_filter->AlwaysFalse());
@@ -89,7 +89,7 @@ TEST(MinMaxFilterTest, TestNumericMinMaxFilter) {
   EXPECT_FALSE(tFilter.min.__isset.int_val);
   EXPECT_FALSE(tFilter.max.__isset.int_val);
   MinMaxFilter* empty_filter =
-      MinMaxFilter::Create(tFilter, int_type, &obj_pool, &mem_pool);
+      MinMaxFilter::Create(tFilter, int_type, &obj_pool, &mem_tracker);
   EXPECT_TRUE(empty_filter->AlwaysFalse());
   EXPECT_FALSE(empty_filter->AlwaysTrue());
 
@@ -113,7 +113,7 @@ TEST(MinMaxFilterTest, TestNumericMinMaxFilter) {
   EXPECT_EQ(tFilter.min.int_val, i4);
   EXPECT_EQ(tFilter.max.int_val, i2);
   MinMaxFilter* int_filter2 =
-      MinMaxFilter::Create(tFilter, int_type, &obj_pool, &mem_pool);
+      MinMaxFilter::Create(tFilter, int_type, &obj_pool, &mem_tracker);
   CheckIntVals(int_filter2, i4, i2);
 
   // Check the behavior of Or.
@@ -126,6 +126,10 @@ TEST(MinMaxFilterTest, TestNumericMinMaxFilter) {
   MinMaxFilter::Or(tFilter1, &tFilter2);
   EXPECT_EQ(tFilter2.min.int_val, 2);
   EXPECT_EQ(tFilter2.max.int_val, 8);
+
+  int_filter->Close();
+  empty_filter->Close();
+  int_filter2->Close();
 }
 
 void CheckStringVals(MinMaxFilter* filter, const string& min, const string& max) {
@@ -146,10 +150,9 @@ void CheckStringVals(MinMaxFilter* filter, const string& min, const string& max)
 TEST(MinMaxFilterTest, TestStringMinMaxFilter) {
   ObjectPool obj_pool;
   MemTracker mem_tracker;
-  MemPool mem_pool(&mem_tracker);
 
   ColumnType string_type(PrimitiveType::TYPE_STRING);
-  MinMaxFilter* filter = MinMaxFilter::Create(string_type, &obj_pool, &mem_pool);
+  MinMaxFilter* filter = MinMaxFilter::Create(string_type, &obj_pool, &mem_tracker);
 
   // Test the behavior of an empty filter.
   EXPECT_TRUE(filter->AlwaysFalse());
@@ -163,7 +166,7 @@ TEST(MinMaxFilterTest, TestStringMinMaxFilter) {
   EXPECT_FALSE(tFilter.always_true);
 
   MinMaxFilter* empty_filter =
-      MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_pool);
+      MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_tracker);
   EXPECT_TRUE(empty_filter->AlwaysFalse());
   EXPECT_FALSE(empty_filter->AlwaysTrue());
 
@@ -229,7 +232,7 @@ TEST(MinMaxFilterTest, TestStringMinMaxFilter) {
   EXPECT_EQ(tFilter.max.string_val, truncTrailMaxChar);
 
   MinMaxFilter* filter2 =
-      MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_pool);
+      MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_tracker);
   CheckStringVals(filter2, b1024, truncTrailMaxChar);
 
   // Check that if the entire string is the max char and therefore after truncating for
@@ -249,15 +252,12 @@ TEST(MinMaxFilterTest, TestStringMinMaxFilter) {
   EXPECT_TRUE(tFilter.always_true);
 
   MinMaxFilter* always_true_filter =
-      MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_pool);
+      MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_tracker);
   EXPECT_FALSE(always_true_filter->AlwaysFalse());
   EXPECT_TRUE(always_true_filter->AlwaysTrue());
 
-  mem_pool.FreeAll();
-
   // Check that a filter that hits the mem limit is disabled.
   MemTracker limit_mem_tracker(1);
-  MemPool limit_mem_pool(&limit_mem_tracker);
   // We do not want to start the webserver.
   FLAGS_enable_webserver = false;
   std::unique_ptr<TestEnv> env;
@@ -265,7 +265,7 @@ TEST(MinMaxFilterTest, TestStringMinMaxFilter) {
   ASSERT_OK(env->Init());
 
   MinMaxFilter* limit_filter =
-      MinMaxFilter::Create(string_type, &obj_pool, &limit_mem_pool);
+      MinMaxFilter::Create(string_type, &obj_pool, &limit_mem_tracker);
   EXPECT_FALSE(limit_filter->AlwaysTrue());
   limit_filter->Insert(&cVal);
   limit_filter->MaterializeValues();
@@ -288,6 +288,12 @@ TEST(MinMaxFilterTest, TestStringMinMaxFilter) {
   MinMaxFilter::Or(tFilter1, &tFilter2);
   EXPECT_EQ(tFilter2.min.string_val, "a");
   EXPECT_EQ(tFilter2.max.string_val, "e");
+
+  filter->Close();
+  empty_filter->Close();
+  filter2->Close();
+  limit_filter->Close();
+  always_true_filter->Close();
 }
 
 void CheckTimestampVals(
@@ -303,9 +309,8 @@ void CheckTimestampVals(
 TEST(MinMaxFilterTest, TestTimestampMinMaxFilter) {
   ObjectPool obj_pool;
   MemTracker mem_tracker;
-  MemPool mem_pool(&mem_tracker);
   ColumnType timestamp_type(PrimitiveType::TYPE_TIMESTAMP);
-  MinMaxFilter* filter = MinMaxFilter::Create(timestamp_type, &obj_pool, &mem_pool);
+  MinMaxFilter* filter = MinMaxFilter::Create(timestamp_type, &obj_pool, &mem_tracker);
 
   // Test the behavior of an empty filter.
   EXPECT_TRUE(filter->AlwaysFalse());
@@ -317,7 +322,7 @@ TEST(MinMaxFilterTest, TestTimestampMinMaxFilter) {
   EXPECT_FALSE(tFilter.min.__isset.timestamp_val);
   EXPECT_FALSE(tFilter.max.__isset.timestamp_val);
   MinMaxFilter* empty_filter =
-      MinMaxFilter::Create(tFilter, timestamp_type, &obj_pool, &mem_pool);
+      MinMaxFilter::Create(tFilter, timestamp_type, &obj_pool, &mem_tracker);
   EXPECT_TRUE(empty_filter->AlwaysFalse());
   EXPECT_FALSE(empty_filter->AlwaysTrue());
 
@@ -341,7 +346,7 @@ TEST(MinMaxFilterTest, TestTimestampMinMaxFilter) {
   EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter.min), t2);
   EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter.max), t3);
   MinMaxFilter* filter2 =
-      MinMaxFilter::Create(tFilter, timestamp_type, &obj_pool, &mem_pool);
+      MinMaxFilter::Create(tFilter, timestamp_type, &obj_pool, &mem_tracker);
   CheckTimestampVals(filter2, t2, t3);
 
   // Check the behavior of Or.
@@ -354,6 +359,10 @@ TEST(MinMaxFilterTest, TestTimestampMinMaxFilter) {
   MinMaxFilter::Or(tFilter1, &tFilter2);
   EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter2.min), t2);
   EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter2.max), t3);
+
+  filter->Close();
+  empty_filter->Close();
+  filter2->Close();
 }
 
 int main(int argc, char** argv) {

http://git-wip-us.apache.org/repos/asf/impala/blob/9f5c5e6d/be/src/util/min-max-filter.cc
----------------------------------------------------------------------
diff --git a/be/src/util/min-max-filter.cc b/be/src/util/min-max-filter.cc
index f50f896..5a8bfc7 100644
--- a/be/src/util/min-max-filter.cc
+++ b/be/src/util/min-max-filter.cc
@@ -199,8 +199,11 @@ NUMERIC_MIN_MAX_FILTER_NO_CAST(Double);
 const char* StringMinMaxFilter::LLVM_CLASS_NAME = "class.impala::StringMinMaxFilter";
 const int StringMinMaxFilter::MAX_BOUND_LENGTH = 1024;
 
-StringMinMaxFilter::StringMinMaxFilter(const TMinMaxFilter& thrift, MemPool* mem_pool)
-  : min_buffer_(mem_pool), max_buffer_(mem_pool) {
+StringMinMaxFilter::StringMinMaxFilter(
+    const TMinMaxFilter& thrift, MemTracker* mem_tracker)
+  : mem_pool_(mem_tracker),
+    min_buffer_(&mem_pool_),
+    max_buffer_(&mem_pool_) {
   always_false_ = thrift.always_false;
   always_true_ = thrift.always_true;
   if (!always_true_ && !always_false_) {
@@ -388,7 +391,8 @@ bool MinMaxFilter::GetCastIntMinMax(
   return true;
 }
 
-MinMaxFilter* MinMaxFilter::Create(ColumnType type, ObjectPool* pool, MemPool* mem_pool) {
+MinMaxFilter* MinMaxFilter::Create(
+    ColumnType type, ObjectPool* pool, MemTracker* mem_tracker) {
   switch (type.type) {
     case PrimitiveType::TYPE_BOOLEAN:
       return pool->Add(new BoolMinMaxFilter());
@@ -405,7 +409,7 @@ MinMaxFilter* MinMaxFilter::Create(ColumnType type, ObjectPool* pool, MemPool* m
     case PrimitiveType::TYPE_DOUBLE:
       return pool->Add(new DoubleMinMaxFilter());
     case PrimitiveType::TYPE_STRING:
-      return pool->Add(new StringMinMaxFilter(mem_pool));
+      return pool->Add(new StringMinMaxFilter(mem_tracker));
     case PrimitiveType::TYPE_TIMESTAMP:
       return pool->Add(new TimestampMinMaxFilter());
     default:
@@ -414,8 +418,8 @@ MinMaxFilter* MinMaxFilter::Create(ColumnType type, ObjectPool* pool, MemPool* m
   return nullptr;
 }
 
-MinMaxFilter* MinMaxFilter::Create(
-    const TMinMaxFilter& thrift, ColumnType type, ObjectPool* pool, MemPool* mem_pool) {
+MinMaxFilter* MinMaxFilter::Create(const TMinMaxFilter& thrift, ColumnType type,
+    ObjectPool* pool, MemTracker* mem_tracker) {
   switch (type.type) {
     case PrimitiveType::TYPE_BOOLEAN:
       return pool->Add(new BoolMinMaxFilter(thrift));
@@ -432,7 +436,7 @@ MinMaxFilter* MinMaxFilter::Create(
     case PrimitiveType::TYPE_DOUBLE:
       return pool->Add(new DoubleMinMaxFilter(thrift));
     case PrimitiveType::TYPE_STRING:
-      return pool->Add(new StringMinMaxFilter(thrift, mem_pool));
+      return pool->Add(new StringMinMaxFilter(thrift, mem_tracker));
     case PrimitiveType::TYPE_TIMESTAMP:
       return pool->Add(new TimestampMinMaxFilter(thrift));
     default:

http://git-wip-us.apache.org/repos/asf/impala/blob/9f5c5e6d/be/src/util/min-max-filter.h
----------------------------------------------------------------------
diff --git a/be/src/util/min-max-filter.h b/be/src/util/min-max-filter.h
index 556f5fa..7982500 100644
--- a/be/src/util/min-max-filter.h
+++ b/be/src/util/min-max-filter.h
@@ -27,7 +27,7 @@
 
 namespace impala {
 
-class MemPool;
+class MemTacker;
 class ObjectPool;
 
 /// A MinMaxFilter tracks the min and max currently seen values in a data set for use in
@@ -42,6 +42,7 @@ class ObjectPool;
 class MinMaxFilter {
  public:
   virtual ~MinMaxFilter() {}
+  virtual void Close() {}
 
   /// Returns the min/max values in the tuple slot representation. It is not valid to call
   /// these functions if AlwaysFalse() returns true.
@@ -77,13 +78,13 @@ class MinMaxFilter {
 
   virtual std::string DebugString() const = 0;
 
-  /// Returns a new MinMaxFilter with the given type, allocated from 'pool'.
-  static MinMaxFilter* Create(ColumnType type, ObjectPool* pool, MemPool* mem_pool);
+  /// Returns a new MinMaxFilter with the given type, allocated from 'mem_tracker'.
+  static MinMaxFilter* Create(ColumnType type, ObjectPool* pool, MemTracker* mem_tracker);
 
   /// Returns a new MinMaxFilter created from the thrift representation, allocated from
-  /// 'pool'.
-  static MinMaxFilter* Create(
-      const TMinMaxFilter& thrift, ColumnType type, ObjectPool* pool, MemPool* mem_pool);
+  /// 'mem_tracker'.
+  static MinMaxFilter* Create(const TMinMaxFilter& thrift, ColumnType type,
+      ObjectPool* pool, MemTracker* mem_tracker);
 
   /// Computes the logical OR of 'in' with 'out' and stores the result in 'out'.
   static void Or(const TMinMaxFilter& in, TMinMaxFilter* out);
@@ -139,13 +140,15 @@ NUMERIC_MIN_MAX_FILTER(Double, double);
 
 class StringMinMaxFilter : public MinMaxFilter {
  public:
-  StringMinMaxFilter(MemPool* mem_pool)
-    : min_buffer_(mem_pool),
-      max_buffer_(mem_pool),
+  StringMinMaxFilter(MemTracker* mem_tracker)
+    : mem_pool_(mem_tracker),
+      min_buffer_(&mem_pool_),
+      max_buffer_(&mem_pool_),
       always_false_(true),
       always_true_(false) {}
-  StringMinMaxFilter(const TMinMaxFilter& thrift, MemPool* mem_pool);
+  StringMinMaxFilter(const TMinMaxFilter& thrift, MemTracker* mem_tracker);
   virtual ~StringMinMaxFilter() {}
+  virtual void Close() override { mem_pool_.FreeAll(); }
 
   virtual void* GetMin() override { return &min_; }
   virtual void* GetMax() override { return &max_; }
@@ -182,6 +185,9 @@ class StringMinMaxFilter : public MinMaxFilter {
   /// into this filter that are longer than this will be truncated.
   static const int MAX_BOUND_LENGTH;
 
+  /// MemPool that 'min_buffer_'/'max_buffer_' are allocated from.
+  MemPool mem_pool_;
+
   /// The min/max values. After a call to MaterializeValues() these will point to
   /// 'min_buffer_'/'max_buffer_'.
   StringValue min_;

http://git-wip-us.apache.org/repos/asf/impala/blob/9f5c5e6d/testdata/workloads/functional-query/queries/QueryTest/min_max_filters.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/min_max_filters.test b/testdata/workloads/functional-query/queries/QueryTest/min_max_filters.test
index c87cc74..5e2ec29 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/min_max_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/min_max_filters.test
@@ -119,3 +119,27 @@ where a.tinyint_col = v.x
 ---- RESULTS
 730
 ====
+
+
+---- QUERY
+####################################################
+# Test case 4: Two StringMinMaxFilters generated in the same fragment (IMPALA-7272).
+###################################################
+select straight_join a.l_orderkey
+from tpch_kudu.lineitem a
+  inner join /* +shuffle */ tpch_kudu.lineitem b on a.l_comment = b.l_comment
+  inner join /* +shuffle */ tpch_kudu.lineitem c on b.l_comment = c.l_comment
+where a.l_orderkey = 1
+---- TYPES
+BIGINT
+---- RESULTS
+1
+1
+1
+1
+1
+1
+1
+1
+1
+====
\ No newline at end of file