You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/23 06:40:11 UTC

[1/6] incubator-impala git commit: IMPALA-992: Rerun past queries from history in shell

Repository: incubator-impala
Updated Branches:
  refs/heads/master ed87c4060 -> 679ebc1ac


IMPALA-992: Rerun past queries from history in shell

This patch adds a new command "rerun" and a shortcut "@" to impala-shell
. Users can rerun a certain query by its index given by history command.
A valid index is an integer in [1, history_length] or
[-history_length, -1]. Negative values index history in reverse order.
For example, "@1;" or "rerun 1;" reruns the first query shown in history
and "@-1;" reruns the last query. The rerun command itself won't appear
in history. The history index is 1-based and increasing. Old entries
might be truncated when impala-shell starts, and the indexes will be
realigned to 1, so the same index may refer to different commands among
multiple impala-shell instances.

Testing: A test case test_rerun is added to
shell/test_shell_interactive.py

Change-Id: Ifc28e8ce07845343267224c3b9ccb71b29a524d2
Reviewed-on: http://gerrit.cloudera.org:8080/7674
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c871e007
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c871e007
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c871e007

Branch: refs/heads/master
Commit: c871e007bed662732944fd5bb8d88e306ff4865b
Parents: ed87c40
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Mon Aug 14 19:03:09 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Aug 23 03:34:45 2017 +0000

----------------------------------------------------------------------
 shell/impala_shell.py                 | 49 +++++++++++++++++++++++++++---
 tests/shell/test_shell_interactive.py | 36 ++++++++++++++++++++++
 2 files changed, 81 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c871e007/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index c45f932..6d9df4f 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -46,6 +46,8 @@ from thrift.Thrift import TException
 
 VERSION_FORMAT = "Impala Shell v%(version)s (%(git_hash)s) built on %(build_date)s"
 VERSION_STRING = "build version not available"
+READLINE_UNAVAILABLE_ERROR = "The readline module was either not found or disabled. " \
+                             "Command history will not be collected."
 
 # Tarball / packaging build makes impala_build_version available
 try:
@@ -76,7 +78,7 @@ class ImpalaPrettyTable(prettytable.PrettyTable):
       value = unicode(value, self.encoding, "replace")
     return value
 
-class ImpalaShell(cmd.Cmd):
+class ImpalaShell(object, cmd.Cmd):
   """ Simple Impala Shell.
 
   Basic usage: type connect <host:port> to connect to an impalad
@@ -1068,15 +1070,40 @@ class ImpalaShell(cmd.Cmd):
 
   def do_history(self, args):
     """Display command history"""
-    # Deal with readline peculiarity. When history does not exists,
+    # Deal with readline peculiarity. When history does not exist,
     # readline returns 1 as the history length and stores 'None' at index 0.
     if self.readline and self.readline.get_current_history_length() > 0:
       for index in xrange(1, self.readline.get_current_history_length() + 1):
         cmd = self.readline.get_history_item(index)
         print_to_stderr('[%d]: %s' % (index, cmd))
     else:
-      print_to_stderr("The readline module was either not found or disabled. Command "
-                      "history will not be collected.")
+      print_to_stderr(READLINE_UNAVAILABLE_ERROR)
+
+  def do_rerun(self, args):
+    """Rerun a command with an command index in history
+    Example: @1;
+    """
+    history_len = self.readline.get_current_history_length()
+    # Rerun command shouldn't appear in history
+    self.readline.remove_history_item(history_len - 1)
+    history_len -= 1
+    if not self.readline:
+      print_to_stderr(READLINE_UNAVAILABLE_ERROR)
+      return CmdStatus.ERROR
+    try:
+      cmd_idx = int(args)
+    except ValueError:
+      print_to_stderr("Command index to be rerun must be an integer.")
+      return CmdStatus.ERROR
+    if not (0 < cmd_idx <= history_len or -history_len <= cmd_idx < 0):
+      print_to_stderr("Command index out of range. Valid range: [1, {0}] and [-{0}, -1]"
+                      .format(history_len))
+      return CmdStatus.ERROR
+    if cmd_idx < 0:
+      cmd_idx += history_len + 1
+    cmd = self.readline.get_history_item(cmd_idx)
+    print_to_stderr("Rerunning " + cmd)
+    return self.onecmd(cmd.rstrip(";"))
 
   def do_tip(self, args):
     """Print a random tip"""
@@ -1124,6 +1151,20 @@ class ImpalaShell(cmd.Cmd):
         # The history file is not writable, disable readline.
         self._disable_readline()
 
+  def parseline(self, line):
+    """Parse the line into a command name and a string containing
+    the arguments.  Returns a tuple containing (command, args, line).
+    'command' and 'args' may be None if the line couldn't be parsed.
+    'line' in return tuple is the rewritten original line, with leading
+    and trailing space removed and special characters transformed into
+    their aliases.
+    """
+    line = line.strip()
+    if line and line[0] == '@':
+      line = 'rerun ' + line[1:]
+    return super(ImpalaShell, self).parseline(line)
+
+
   def _replace_history_delimiters(self, src_delim, tgt_delim):
     """Replaces source_delim with target_delim for all items in history.
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c871e007/tests/shell/test_shell_interactive.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_interactive.py b/tests/shell/test_shell_interactive.py
index 9222637..7b85a55 100755
--- a/tests/shell/test_shell_interactive.py
+++ b/tests/shell/test_shell_interactive.py
@@ -232,6 +232,42 @@ class TestImpalaShellInteractive(object):
       assert query in result.stderr, "'%s' not in '%s'" % (query, result.stderr)
 
   @pytest.mark.execute_serially
+  def test_rerun(self):
+    """Smoke test for the 'rerun' command"""
+    # Clear history first.
+    if os.path.exists(SHELL_HISTORY_FILE):
+      os.remove(SHELL_HISTORY_FILE)
+    assert not os.path.exists(SHELL_HISTORY_FILE)
+    child_proc = pexpect.spawn(SHELL_CMD)
+    child_proc.expect(":21000] >")
+    self._expect_with_cmd(child_proc, "@1", ("Command index out of range"))
+    self._expect_with_cmd(child_proc, "rerun -1", ("Command index out of range"))
+    self._expect_with_cmd(child_proc, "select 'first_command'", ("first_command"))
+    self._expect_with_cmd(child_proc, "rerun 1", ("first_command"))
+    self._expect_with_cmd(child_proc, "@ -1", ("first_command"))
+    self._expect_with_cmd(child_proc, "select 'second_command'", ("second_command"))
+    child_proc.sendline('history;')
+    child_proc.expect(":21000] >")
+    assert '[1]: select \'first_command\';' in child_proc.before;
+    assert '[2]: select \'second_command\';' in child_proc.before;
+    assert '[3]: history;' in child_proc.before;
+    # Rerunning command should not add an entry into history.
+    assert '[4]' not in child_proc.before;
+    self._expect_with_cmd(child_proc, "@0", ("Command index out of range"))
+    self._expect_with_cmd(child_proc, "rerun   4", ("Command index out of range"))
+    self._expect_with_cmd(child_proc, "@-4", ("Command index out of range"))
+    self._expect_with_cmd(child_proc, " @ 3 ", ("second_command"))
+    self._expect_with_cmd(child_proc, "@-3", ("first_command"))
+    self._expect_with_cmd(child_proc, "@",
+                          ("Command index to be rerun must be an integer."))
+    self._expect_with_cmd(child_proc, "@1foo",
+                          ("Command index to be rerun must be an integer."))
+    self._expect_with_cmd(child_proc, "@1 2",
+                          ("Command index to be rerun must be an integer."))
+    self._expect_with_cmd(child_proc, "rerun1", ("Syntax error"))
+    child_proc.sendline('quit;')
+
+  @pytest.mark.execute_serially
   def test_tip(self):
     """Smoke test for the TIP command"""
     # Temporarily add impala_shell module to path to get at TIPS list for verification


[3/6] incubator-impala git commit: IMPALA-5352: Age out unused file handles from the cache

Posted by ta...@apache.org.
IMPALA-5352: Age out unused file handles from the cache

Currently, a file handle in the file handle cache will
only be evicted if the cache reaches its capacity. This
means that file handles can be retained for an indefinite
amount of time. This is true even for files that have
been deleted, replaced, or modified. Since a file handle
maintains a file descriptor for local files, this can
prevent the disk space from being freed. Additionally,
unused file handles are wasted memory.

This adds code to evict file handles that have been
unused for longer than a specified threshold. A thread
periodically checks the file handle cache to see if
any file handle should be evicted. The threshold is
specified by 'unused_file_handle_timeout_sec'; it
defaults to 6 hours.

This adds a test to custom_cluster/test_hdfs_fd_caching.py
to verify the eviction behavior.

Change-Id: Iefe04b3e2e22123ecb8b3e494934c93dfb29682e
Reviewed-on: http://gerrit.cloudera.org:8080/7640
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/57dae5ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/57dae5ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/57dae5ec

Branch: refs/heads/master
Commit: 57dae5ec7e927a1c836f6bf0a1cbe5a81541327e
Parents: b6c0297
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Thu Jun 29 13:08:58 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Aug 23 04:44:21 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/disk-io-mgr-handle-cache.h       | 54 +++++++++---
 .../runtime/disk-io-mgr-handle-cache.inline.h   | 87 +++++++++++++++++---
 be/src/runtime/disk-io-mgr.cc                   | 20 ++++-
 tests/custom_cluster/test_hdfs_fd_caching.py    | 42 ++++++++--
 4 files changed, 172 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57dae5ec/be/src/runtime/disk-io-mgr-handle-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-handle-cache.h b/be/src/runtime/disk-io-mgr-handle-cache.h
index ddfb934..96add9f 100644
--- a/be/src/runtime/disk-io-mgr-handle-cache.h
+++ b/be/src/runtime/disk-io-mgr-handle-cache.h
@@ -29,6 +29,7 @@
 #include "util/aligned-new.h"
 #include "util/impalad-metrics.h"
 #include "util/spinlock.h"
+#include "util/thread.h"
 
 namespace impala {
 
@@ -73,19 +74,29 @@ class HdfsFileHandle {
 /// of concurrent connections, and file handles in the cache would be counted towards
 /// that limit.
 ///
-/// TODO: If there is a file handle in the cache and the underlying file is deleted,
+/// If there is a file handle in the cache and the underlying file is deleted,
 /// the file handle might keep the file from being deleted at the OS level. This can
-/// take up disk space and impact correctness. The cache should check periodically to
-/// evict file handles older than some configurable threshold. The cache should also
-/// evict file handles more aggressively if the file handle's mtime is older than the
-/// file's current mtime.
+/// take up disk space and impact correctness. To avoid this, the cache will evict any
+/// file handle that has been unused for longer than threshold specified by
+/// `unused_handle_timeout_secs`. Eviction is disabled when the threshold is 0.
+///
+/// TODO: The cache should also evict file handles more aggressively if the file handle's
+/// mtime is older than the file's current mtime.
 template <size_t NUM_PARTITIONS>
 class FileHandleCache {
  public:
   /// Instantiates the cache with `capacity` split evenly across NUM_PARTITIONS
   /// partitions. If the capacity does not split evenly, then the capacity is rounded
-  /// up.
-  FileHandleCache(size_t capacity);
+  /// up. The cache will age out any file handle that is unused for
+  /// `unused_handle_timeout_secs` seconds. Age out is disabled if this is set to zero.
+  FileHandleCache(size_t capacity, uint64_t unused_handle_timeout_secs);
+
+  /// Destructor is only called for backend tests
+  ~FileHandleCache();
+
+  /// Starts up a thread that monitors the age of file handles and evicts any that
+  /// exceed the limit.
+  void Init();
 
   /// Get a file handle from the cache for the specified filename (fname) and
   /// last modification time (mtime). This will hash the filename to determine
@@ -112,18 +123,26 @@ class FileHandleCache {
  private:
   struct FileHandleEntry;
   typedef std::multimap<std::string, FileHandleEntry> MapType;
-  typedef std::list<typename MapType::iterator> LruListType;
+
+  struct LruListEntry {
+    LruListEntry(typename MapType::iterator map_entry_in);
+    typename MapType::iterator map_entry;
+    uint64_t timestamp_seconds;
+  };
+  typedef std::list<LruListEntry> LruListType;
 
   struct FileHandleEntry {
-    FileHandleEntry(HdfsFileHandle *fh_in) : fh(fh_in) {}
+    FileHandleEntry(HdfsFileHandle* fh_in, LruListType& lru_list)
+    : fh(fh_in), lru_entry(lru_list.end()) {}
     std::unique_ptr<HdfsFileHandle> fh;
 
     /// in_use is true for a file handle checked out via GetFileHandle() that has not
     /// been returned via ReleaseFileHandle().
     bool in_use = false;
 
-    /// Iterator to this element's location in the LRU list. This only has a valid value
-    /// if in_use is false.
+    /// Iterator to this element's location in the LRU list. This only points to a
+    /// valid location when in_use is true. For error-checking, this is set to
+    /// lru_list.end() when in_use is false.
     typename LruListType::iterator lru_entry;
   };
 
@@ -151,11 +170,24 @@ class FileHandleCache {
     size_t size;
   };
 
+  /// Periodic check to evict unused file handles. Only executed by eviction_thread_.
+  void EvictHandlesLoop();
+  static const int64_t EVICT_HANDLES_PERIOD_MS = 1000;
+
   /// If the partition is above its capacity, evict the oldest unused file handles to
   /// enforce the capacity.
   void EvictHandles(FileHandleCachePartition& p);
 
   std::array<FileHandleCachePartition, NUM_PARTITIONS> cache_partitions_;
+
+  /// Maximum time before an unused file handle is aged out of the cache.
+  /// Aging out is disabled if this is set to 0.
+  uint64_t unused_handle_timeout_secs_;
+
+  /// Thread to check for unused file handles to evict. This thread will exit when
+  /// the shut_down_promise_ is set.
+  std::unique_ptr<Thread> eviction_thread_;
+  Promise<bool> shut_down_promise_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57dae5ec/be/src/runtime/disk-io-mgr-handle-cache.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-handle-cache.inline.h b/be/src/runtime/disk-io-mgr-handle-cache.inline.h
index ed8ed63..76bef95 100644
--- a/be/src/runtime/disk-io-mgr-handle-cache.inline.h
+++ b/be/src/runtime/disk-io-mgr-handle-cache.inline.h
@@ -15,8 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <tuple>
+
 #include "runtime/disk-io-mgr-handle-cache.h"
 #include "util/hash-util.h"
+#include "util/time.h"
 
 #ifndef IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_INLINE_H
 #define IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_INLINE_H
@@ -41,7 +44,9 @@ HdfsFileHandle::~HdfsFileHandle() {
 }
 
 template <size_t NUM_PARTITIONS>
-FileHandleCache<NUM_PARTITIONS>::FileHandleCache(size_t capacity) {
+  FileHandleCache<NUM_PARTITIONS>::FileHandleCache(size_t capacity,
+      uint64_t unused_handle_timeout_secs)
+  : unused_handle_timeout_secs_(unused_handle_timeout_secs) {
   DCHECK_GT(NUM_PARTITIONS, 0);
   size_t remainder = capacity % NUM_PARTITIONS;
   size_t base_capacity = capacity / NUM_PARTITIONS;
@@ -53,6 +58,23 @@ FileHandleCache<NUM_PARTITIONS>::FileHandleCache(size_t capacity) {
 }
 
 template <size_t NUM_PARTITIONS>
+FileHandleCache<NUM_PARTITIONS>::LruListEntry::LruListEntry(
+    typename MapType::iterator map_entry_in)
+     : map_entry(map_entry_in), timestamp_seconds(MonotonicSeconds()) {}
+
+template <size_t NUM_PARTITIONS>
+FileHandleCache<NUM_PARTITIONS>::~FileHandleCache() {
+  shut_down_promise_.Set(true);
+  if (eviction_thread_ != nullptr) eviction_thread_->Join();
+}
+
+template <size_t NUM_PARTITIONS>
+void FileHandleCache<NUM_PARTITIONS>::Init() {
+  eviction_thread_.reset(new Thread("disk-io-mgr-handle-cache", "File Handle Timeout",
+      &FileHandleCache<NUM_PARTITIONS>::EvictHandlesLoop, this));
+}
+
+template <size_t NUM_PARTITIONS>
 HdfsFileHandle* FileHandleCache<NUM_PARTITIONS>::GetFileHandle(
     const hdfsFS& fs, std::string* fname, int64_t mtime, bool require_new_handle,
     bool* cache_hit) {
@@ -70,8 +92,13 @@ HdfsFileHandle* FileHandleCache<NUM_PARTITIONS>::GetFileHandle(
     while (range.first != range.second) {
       FileHandleEntry* elem = &range.first->second;
       if (!elem->in_use && elem->fh->mtime() == mtime) {
-        // remove from lru
+        // This element is currently in the lru_list, which means that lru_entry must
+        // be an iterator pointing into the lru_list.
+        DCHECK(elem->lru_entry != p.lru_list.end());
+        // Remove the element from the lru_list and designate that it is not on
+        // the lru_list by resetting its iterator to point to the end of the list.
         p.lru_list.erase(elem->lru_entry);
+        elem->lru_entry = p.lru_list.end();
         ret_elem = elem;
         *cache_hit = true;
         break;
@@ -83,14 +110,15 @@ HdfsFileHandle* FileHandleCache<NUM_PARTITIONS>::GetFileHandle(
   // There was no entry that was free or caller asked for a new handle
   if (!ret_elem) {
     *cache_hit = false;
-    // Create a new entry and put it in the map
+    // Create a new entry and move it into the map
     HdfsFileHandle* new_fh = new HdfsFileHandle(fs, fname->data(), mtime);
     if (!new_fh->ok()) {
       delete new_fh;
       return nullptr;
     }
-    typename MapType::iterator new_it = p.cache.emplace_hint(range.second, *fname,
-        new_fh);
+    FileHandleEntry entry(new_fh, p.lru_list);
+    typename MapType::iterator new_it = p.cache.emplace_hint(range.second,
+        *fname, std::move(entry));
     ret_elem = &new_it->second;
     ++p.size;
     if (p.size > p.capacity) EvictHandles(p);
@@ -139,7 +167,15 @@ void FileHandleCache<NUM_PARTITIONS>::ReleaseFileHandle(std::string* fname,
   // If unbuffering is not supported, then hdfsUnbufferFile() will return a non-zero
   // return code, and we close the file handle and remove it from the cache.
   if (hdfsUnbufferFile(release_elem->fh->file()) == 0) {
-    release_elem->lru_entry = p.lru_list.insert(p.lru_list.end(), release_it);
+    // This FileHandleEntry must not be in the lru list already, because it was
+    // in use. Verify this by checking that the lru_entry is pointing to the end,
+    // which cannot be true for any element in the lru list.
+    DCHECK(release_elem->lru_entry == p.lru_list.end());
+    // Add this to the lru list, establishing links in both directions.
+    // The FileHandleEntry has an iterator to the LruListEntry and the
+    // LruListEntry has an iterator to the location of the FileHandleEntry in
+    // the cache.
+    release_elem->lru_entry = p.lru_list.emplace(p.lru_list.end(), release_it);
     if (p.size > p.capacity) EvictHandles(p);
   } else {
     VLOG_FILE << "FS does not support file handle unbuffering, closing file="
@@ -150,13 +186,42 @@ void FileHandleCache<NUM_PARTITIONS>::ReleaseFileHandle(std::string* fname,
 }
 
 template <size_t NUM_PARTITIONS>
+void FileHandleCache<NUM_PARTITIONS>::EvictHandlesLoop() {
+  while (true) {
+    for (FileHandleCachePartition& p : cache_partitions_) {
+      boost::lock_guard<SpinLock> g(p.lock);
+      EvictHandles(p);
+    }
+    // This Get() will time out until shutdown, when the promise is set.
+    bool timed_out;
+    shut_down_promise_.Get(EVICT_HANDLES_PERIOD_MS, &timed_out);
+    if (!timed_out) break;
+  }
+  // The promise must be set to true.
+  DCHECK(shut_down_promise_.IsSet());
+  DCHECK(shut_down_promise_.Get());
+}
+
+template <size_t NUM_PARTITIONS>
 void FileHandleCache<NUM_PARTITIONS>::EvictHandles(
     FileHandleCache<NUM_PARTITIONS>::FileHandleCachePartition& p) {
-  while (p.size > p.capacity) {
-    if (p.lru_list.size() == 0) break;
-    typename MapType::iterator evict_it = p.lru_list.front();
-    DCHECK(!evict_it->second.in_use);
-    p.cache.erase(evict_it);
+  uint64_t now = MonotonicSeconds();
+  uint64_t oldest_allowed_timestamp =
+      now > unused_handle_timeout_secs_ ? now - unused_handle_timeout_secs_ : 0;
+  while (p.lru_list.size() > 0) {
+    // Peek at the oldest element
+    LruListEntry oldest_entry = p.lru_list.front();
+    typename MapType::iterator oldest_entry_map_it = oldest_entry.map_entry;
+    uint64_t oldest_entry_timestamp = oldest_entry.timestamp_seconds;
+    // If the oldest element does not need to be aged out and the cache is not over
+    // capacity, then we are done and there is nothing to evict.
+    if (p.size <= p.capacity && (unused_handle_timeout_secs_ == 0 ||
+        oldest_entry_timestamp >= oldest_allowed_timestamp)) {
+      return;
+    }
+    // Evict the oldest element
+    DCHECK(!oldest_entry_map_it->second.in_use);
+    p.cache.erase(oldest_entry_map_it);
     p.lru_list.pop_front();
     --p.size;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57dae5ec/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 0fdfb77..dff6ec5 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -100,6 +100,19 @@ DEFINE_int32(max_free_io_buffers, 128,
 DEFINE_uint64(max_cached_file_handles, 20000, "Maximum number of HDFS file handles "
     "that will be cached. Disabled if set to 0.");
 
+// The unused file handle timeout specifies how long a file handle will remain in the
+// cache if it is not being used. Aging out unused handles ensures that the cache is not
+// wasting memory on handles that aren't useful. This allows users to specify a larger
+// cache size, as the system will only use the memory on useful file handles.
+// Additionally, cached file handles keep an open file descriptor for local files.
+// If a file is deleted through HDFS, this open file descriptor can keep the disk space
+// from being freed. When the metadata sees that a file has been deleted, the file handle
+// will no longer be used by future queries. Aging out this file handle allows the
+// disk space to be freed in an appropriate period of time.
+DEFINE_uint64(unused_file_handle_timeout_sec, 21600, "Maximum time, in seconds, that an "
+    "unused HDFS file handle will remain in the file handle cache. Disabled if set "
+    "to 0.");
+
 // The IoMgr is able to run with a wide range of memory usage. If a query has memory
 // remaining less than this value, the IoMgr will stop all buffering regardless of the
 // current queue size.
@@ -289,7 +302,8 @@ DiskIoMgr::DiskIoMgr() :
     total_bytes_read_counter_(TUnit::BYTES),
     read_timer_(TUnit::TIME_NS),
     file_handle_cache_(min(FLAGS_max_cached_file_handles,
-        FileSystemUtil::MaxNumFileHandles())) {
+        FileSystemUtil::MaxNumFileHandles()),
+        FLAGS_unused_file_handle_timeout_sec) {
   int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, min_buffer_size_);
   free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1);
   int num_local_disks = DiskInfo::num_disks();
@@ -314,7 +328,8 @@ DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_rotational_disk,
     total_bytes_read_counter_(TUnit::BYTES),
     read_timer_(TUnit::TIME_NS),
     file_handle_cache_(min(FLAGS_max_cached_file_handles,
-        FileSystemUtil::MaxNumFileHandles())) {
+        FileSystemUtil::MaxNumFileHandles()),
+        FLAGS_unused_file_handle_timeout_sec) {
   int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, min_buffer_size_);
   free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1);
   if (num_local_disks == 0) num_local_disks = DiskInfo::num_disks();
@@ -396,6 +411,7 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
     }
   }
   request_context_cache_.reset(new RequestContextCache(this));
+  file_handle_cache_.Init();
 
   cached_read_options_ = hadoopRzOptionsAlloc();
   DCHECK(cached_read_options_ != nullptr);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57dae5ec/tests/custom_cluster/test_hdfs_fd_caching.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_hdfs_fd_caching.py b/tests/custom_cluster/test_hdfs_fd_caching.py
index 9dcd3bf..ad80cef 100644
--- a/tests/custom_cluster/test_hdfs_fd_caching.py
+++ b/tests/custom_cluster/test_hdfs_fd_caching.py
@@ -23,6 +23,7 @@ from tests.util.filesystem_utils import (
     IS_ISILON,
     IS_S3,
     IS_ADLS)
+from time import sleep
 
 @SkipIfLocal.hdfs_fd_caching
 class TestHdfsFdCaching(CustomClusterTestSuite):
@@ -59,7 +60,8 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
     super(TestHdfsFdCaching, self).teardown_method(method)
     self.client.execute("drop database if exists cachefd cascade")
 
-  def run_fd_caching_test(self, vector, caching_expected, cache_capacity):
+  def run_fd_caching_test(self, vector, caching_expected, cache_capacity,
+      eviction_timeout_secs):
     """
     Tests that HDFS file handles are cached as expected. This is used both
     for the positive and negative test cases. If caching_expected is true,
@@ -103,26 +105,52 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
     # Read all the files of the table and make sure no FD leak
     for x in range(10):
       self.execute_query("select count(*) from cachefd.simple;", vector=vector)
-      assert self.max_cached_handles() <= 16
+      assert self.max_cached_handles() <= cache_capacity
       if not caching_expected:
         assert self.cached_handles() == num_handles_start
     assert self.outstanding_handles() == 0
 
+    if caching_expected and eviction_timeout_secs is not None:
+      # To test unused file handle eviction, sleep for longer than the timeout.
+      # All the cached handles should be evicted.
+      sleep(eviction_timeout_secs + 5)
+      assert self.cached_handles() == 0
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      impalad_args="--max_cached_file_handles=16",
+      impalad_args="--max_cached_file_handles=16 " +
+          " --unused_file_handle_timeout_sec=18446744073709551600",
       catalogd_args="--load_catalog_in_background=false")
   def test_caching_enabled(self, vector):
-    """Test of the HDFS file handle cache with the parameter specified"""
+    """
+    Test of the HDFS file handle cache with the parameter specified and a very
+    large file handle timeout
+    """
+
     cache_capacity = 16
 
     # Caching only applies to local HDFS files. If this is local HDFS, then verify
     # that caching works. Otherwise, verify that file handles are not cached.
-    if (IS_S3 or IS_ADLS or IS_ISILON or pytest.config.option.testing_remote_cluster):
+    if IS_S3 or IS_ADLS or IS_ISILON or pytest.config.option.testing_remote_cluster:
       caching_expected = False
     else:
       caching_expected = True
-    self.run_fd_caching_test(vector, caching_expected, cache_capacity)
+    self.run_fd_caching_test(vector, caching_expected, cache_capacity, None)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--max_cached_file_handles=16 --unused_file_handle_timeout_sec=5",
+      catalogd_args="--load_catalog_in_background=false")
+  def test_caching_with_eviction(self, vector):
+    """Test of the HDFS file handle cache with unused file handle eviction enabled"""
+    cache_capacity = 16
+    handle_timeout = 5
+
+    # Only test eviction on platforms where caching is enabled.
+    if IS_S3 or IS_ADLS or IS_ISILON or pytest.config.option.testing_remote_cluster:
+      return
+    caching_expected = True
+    self.run_fd_caching_test(vector, caching_expected, cache_capacity, handle_timeout)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -132,7 +160,7 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
     """Test that the HDFS file handle cache is disabled when the parameter is zero"""
     cache_capacity = 0
     caching_expected = False
-    self.run_fd_caching_test(vector, caching_expected, cache_capacity)
+    self.run_fd_caching_test(vector, caching_expected, cache_capacity, None)
 
   def cached_handles(self):
     return self.get_agg_metric("impala-server.io.mgr.num-cached-file-handles")


[2/6] incubator-impala git commit: IMPALA-5788: Fix agg node crash when grouping by nondeterministic exprs

Posted by ta...@apache.org.
IMPALA-5788: Fix agg node crash when grouping by nondeterministic exprs

Fixed a bug where impala crashes during execution of an aggregation
query using nondeterministic grouping expressions. This happens when
it tries to rebuild a spilled partition that can fit in memory and rows
get re-hashed to a partition other than the spilled one due to the use
of nondeterministic expressions.

Testing:
Added a query test to verify successful execution.

Change-Id: Ibdb09239577b3f0a19d710b0d148e882b0b73e23
Reviewed-on: http://gerrit.cloudera.org:8080/7714
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b6c02972
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b6c02972
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b6c02972

Branch: refs/heads/master
Commit: b6c02972d6bb8dc4c62ef806c6145acae95842ad
Parents: c871e00
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Wed Aug 16 17:45:06 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Aug 23 03:59:02 2017 +0000

----------------------------------------------------------------------
 be/src/exec/partitioned-aggregation-node.cc     | 23 +++++++++++++++++++-
 be/src/exec/partitioned-aggregation-node.h      | 11 ++++++----
 .../queries/QueryTest/spilling.test             | 13 +++++++++++
 3 files changed, 42 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b6c02972/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index a0fed41..b1d54a6 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -1152,6 +1152,17 @@ Status PartitionedAggregationNode::CreateHashPartitions(
     }
     hash_tbls_[i] = partition->hash_tbl.get();
   }
+  // In this case we did not have to repartition, so ensure that while building the hash
+  // table all rows will be inserted into the partition at 'single_partition_idx' in case
+  // a non deterministic grouping expression causes a row to hash to a different
+  // partition index.
+  if (single_partition_idx != -1) {
+    Partition* partition = hash_partitions_[single_partition_idx];
+    for (int i = 0; i < PARTITION_FANOUT; ++i) {
+      hash_partitions_[i] = partition;
+      hash_tbls_[i] = partition->hash_tbl.get();
+    }
+  }
 
   COUNTER_ADD(partitions_created_, num_partitions_created);
   if (!is_streaming_preagg_) {
@@ -1390,7 +1401,13 @@ Status PartitionedAggregationNode::SpillPartition(bool more_aggregate_rows) {
   }
   DCHECK_NE(partition_idx, -1) << "Should have been able to spill a partition to "
                                << "reclaim memory: " << buffer_pool_client_.DebugString();
-  hash_tbls_[partition_idx] = NULL;
+  // Remove references to the destroyed hash table from 'hash_tbls_'.
+  // Additionally, we might be dealing with a rebuilt spilled partition, where all
+  // partitions point to a single in-memory partition. This also ensures that 'hash_tbls_'
+  // remains consistent in that case.
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    if (hash_partitions_[i] == hash_partitions_[partition_idx]) hash_tbls_[i] = nullptr;
+  }
   return hash_partitions_[partition_idx]->Spill(more_aggregate_rows);
 }
 
@@ -1402,6 +1419,10 @@ Status PartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows) {
   for (int i = 0; i < hash_partitions_.size(); ++i) {
     Partition* partition = hash_partitions_[i];
     if (partition == nullptr) continue;
+    // We might be dealing with a rebuilt spilled partition, where all partitions are
+    // pointing to a single in-memory partition, so make sure we only proceed for the
+    // right partition.
+    if(i != partition->idx) continue;
     int64_t aggregated_rows = 0;
     if (partition->aggregated_row_stream != nullptr) {
       aggregated_rows = partition->aggregated_row_stream->num_rows();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b6c02972/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h
index 210400e..fa8674c 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -336,10 +336,14 @@ class PartitionedAggregationNode : public ExecNode {
   /// Object pool that holds the Partition objects in hash_partitions_.
   boost::scoped_ptr<ObjectPool> partition_pool_;
 
-  /// Current partitions we are partitioning into.
+  /// Current partitions we are partitioning into. IMPALA-5788: For the case where we
+  /// rebuild a spilled partition that fits in memory, all pointers in this vector will
+  /// point to a single in-memory partition.
   std::vector<Partition*> hash_partitions_;
 
-  /// Cache for hash tables in 'hash_partitions_'.
+  /// Cache for hash tables in 'hash_partitions_'. IMPALA-5788: For the case where we
+  /// rebuild a spilled partition that fits in memory, all pointers in this array will
+  /// point to the hash table that is a part of a single in-memory partition.
   HashTable* hash_tbls_[PARTITION_FANOUT];
 
   /// All partitions that have been spilled and need further processing.
@@ -623,8 +627,7 @@ class PartitionedAggregationNode : public ExecNode {
 
   /// Initializes hash_partitions_. 'level' is the level for the partitions to create.
   /// If 'single_partition_idx' is provided, it must be a number in range
-  /// [0, PARTITION_FANOUT), and only that partition is created - the others are
-  /// initialized to NULL.
+  /// [0, PARTITION_FANOUT), and only that partition is created - all others point to it.
   /// Also sets ht_ctx_'s level to 'level'.
   Status CreateHashPartitions(
       int level, int single_partition_idx = -1) WARN_UNUSED_RESULT;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b6c02972/testdata/workloads/functional-query/queries/QueryTest/spilling.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling.test b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
index b6f4f12..3868e4f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
@@ -343,3 +343,16 @@ bigint,bigint,bigint,int,decimal,decimal,decimal,decimal,string,string,string,st
 1382,156162,6163,5,31.00,37762.96,0.07,0.03,'R','F','1993-10-26','1993-10-15','1993-11-09','TAKE BACK RETURN','FOB','hely regular dependencies. f'
 1509,186349,3904,6,31.00,44495.54,0.04,0.03,'A','F','1993-07-14','1993-08-21','1993-08-06','COLLECT COD','SHIP','ic deposits cajole carefully. quickly bold '
 ====
+---- QUERY
+# Test spilling aggregation when grouping by nondeterministic expression
+set buffer_pool_limit=5m;
+set num_nodes=1;
+select l_orderkey, l_partkey, l_suppkey, l_linenumber, l_comment
+from tpch_parquet.lineitem
+group by 1, 2, 3, 4, 5, random()
+limit 5
+---- RUNTIME_PROFILE
+row_regex: .*Query State: FINISHED.*
+row_regex: .*Query Status: OK.*
+row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
+====


[5/6] incubator-impala git commit: IMPALA-5815: right outer join returns invalid memory

Posted by ta...@apache.org.
IMPALA-5815: right outer join returns invalid memory

The bug is that OutputAllBuild() called BufferedTupleStream::GetNext()
while 'out_batch' still referenced data from the current page
of the stream. When iterating over an unpinned stream, GetNext()
sets the 'needs_deep_copy' flag when it hits the end of a page,
so that the caller has an opportunity to flush or deep copy the
data. On the next call to GetNext(), that page may be deleted
or unpinned.

The fix is to check whether the batch is at capacity before
calling BTS::GetNext().

This issue was masked by not using the 'delete_on_read' mode of the
stream, which would have freed the stream's buffers earlier and
increased the odds of ASAN detecting the problem.

Testing:
Running TestTPCHJoinQueries.test_outer_joins() reliably reproduced this
for me locally under ASAN. After the fix the problem does not reoccur.

Change-Id: Ia14148499ddaec41c2e70fef5d53e5d06ea0538d
Reviewed-on: http://gerrit.cloudera.org:8080/7772
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3b1a1df7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3b1a1df7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3b1a1df7

Branch: refs/heads/master
Commit: 3b1a1df7e30de7d21cc30eba8bc7d318ee063d5a
Parents: 5f32312
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Aug 18 13:10:51 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Aug 23 06:17:17 2017 +0000

----------------------------------------------------------------------
 be/src/exec/partitioned-hash-join-node.cc | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b1a1df7/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 2c656b0..13d660f 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -387,7 +387,7 @@ Status PartitionedHashJoinNode::PrepareSpilledPartitionForProbe(
     DCHECK(NeedToProcessUnmatchedBuildRows());
     bool got_read_buffer = false;
     RETURN_IF_ERROR(input_partition_->build_partition()->build_rows()->PrepareForRead(
-        false, &got_read_buffer));
+        true, &got_read_buffer));
     if (!got_read_buffer) {
       return mem_tracker()->MemLimitExceeded(
           runtime_state_, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
@@ -682,6 +682,12 @@ Status PartitionedHashJoinNode::OutputAllBuild(RowBatch* out_batch) {
     if (output_unmatched_batch_iter_->AtEnd()) {
       output_unmatched_batch_->TransferResourceOwnership(out_batch);
       output_unmatched_batch_->Reset();
+      // Need to flush any resources attached to 'out_batch' before calling
+      // build_rows()->GetNext().  E.g. if the previous call to GetNext() set the
+      // 'needs_deep_copy' flag, then calling GetNext() before we return the current
+      // batch leave the batch referencing invalid memory (see IMPALA-5815).
+      if (out_batch->AtCapacity()) break;
+
       RETURN_IF_ERROR(output_build_partitions_.front()->build_rows()->GetNext(
           output_unmatched_batch_.get(), &eos));
       output_unmatched_batch_iter_.reset(


[6/6] incubator-impala git commit: IMPALA-5573: Add decimal codegen in text scanner

Posted by ta...@apache.org.
IMPALA-5573: Add decimal codegen in text scanner

This patch adds decimal type codegen support in text scanner. Currently
codegen would be disabled if there is a decimal column. With this patch
StringParser::StringToDecimal will be called in generated code. A new
file util/string-parser.cc is created and linked into libUtil. This file
contains proxy functions to StringToDecimal in ordered to keep
StringToDecimal out of LLVM IR.

In a benchmark query:
> select l_quantity, l_extendedprice, l_discount, l_tax from biglineitem where l_quantity > 100.0;
where biglineitem is tpch.lineitem repeated 6 times, the codegen version
is 19% faster than non-codgen version in scanning, and 8% faster in
query time. Codegen time in this simple case is 69ms.

Simple performance tests show that putting the parser in libUtil instead
of impala-sse.bc would reduce codegen time by 2/3 in cases where only
one decimal column is parsed while the scanning time is nearly the same.

Change-Id: Ia65820e969d59094dc92d912a5279fa90f6b179d
Reviewed-on: http://gerrit.cloudera.org:8080/7683
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/679ebc1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/679ebc1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/679ebc1a

Branch: refs/heads/master
Commit: 679ebc1ac66e07edc0e81b34c8ed56a0a8aff0ca
Parents: 3b1a1df
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Thu Aug 17 17:35:31 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Aug 23 06:24:12 2017 +0000

----------------------------------------------------------------------
 be/src/codegen/gen_ir_descriptions.py |  5 ++-
 be/src/exec/hdfs-scanner-ir.cc        | 53 ++++++++++++++++++++++++------
 be/src/exec/hdfs-scanner.cc           |  8 -----
 be/src/exec/text-converter.cc         | 46 +++++++++++++++++++++-----
 be/src/util/CMakeLists.txt            |  1 +
 be/src/util/string-parser.cc          | 40 ++++++++++++++++++++++
 be/src/util/string-parser.h           | 11 ++++++-
 7 files changed, 136 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/679ebc1a/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index 3cc195c..668ae24 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -179,13 +179,16 @@ ir_functions = [
   ["PARQUET_SCANNER_EVAL_RUNTIME_FILTER",
    "_ZN6impala18HdfsParquetScanner17EvalRuntimeFilterEiPNS_8TupleRowE"],
   ["STRING_TO_BOOL", "IrStringToBool"],
-  ["STRING_TO_INT8", "_Z14IrStringToInt8PKciPN6impala12StringParser11ParseResultE"],
+  ["STRING_TO_INT8", "IrStringToInt8"],
   ["STRING_TO_INT16", "IrStringToInt16"],
   ["STRING_TO_INT32", "IrStringToInt32"],
   ["STRING_TO_INT64", "IrStringToInt64"],
   ["STRING_TO_FLOAT", "IrStringToFloat"],
   ["STRING_TO_DOUBLE", "IrStringToDouble"],
   ["STRING_TO_TIMESTAMP", "IrStringToTimestamp"],
+  ["STRING_TO_DECIMAL4", "IrStringToDecimal4"],
+  ["STRING_TO_DECIMAL8", "IrStringToDecimal8"],
+  ["STRING_TO_DECIMAL16", "IrStringToDecimal16"],
   ["IS_NULL_STRING", "IrIsNullString"],
   ["GENERIC_IS_NULL_STRING", "IrGenericIsNullString"],
   ["RAW_VALUE_COMPARE",

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/679ebc1a/be/src/exec/hdfs-scanner-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner-ir.cc b/be/src/exec/hdfs-scanner-ir.cc
index 143adb7..867ce1e 100644
--- a/be/src/exec/hdfs-scanner-ir.cc
+++ b/be/src/exec/hdfs-scanner-ir.cc
@@ -80,46 +80,79 @@ ScalarExprEvaluator* HdfsScanner::GetConjunctEval(int idx) const {
   return (*conjunct_evals_)[idx];
 }
 
+void StringToDecimalSymbolDummy() {
+  // Force linker to to link the object file containing these functions.
+  StringToDecimal4(nullptr, 0, 0, 0, nullptr);
+  StringToDecimal8(nullptr, 0, 0, 0, nullptr);
+  StringToDecimal16(nullptr, 0, 0, 0, nullptr);
+}
+
 // Define the string parsing functions for llvm.  Stamp out the templated functions
 #ifdef IR_COMPILE
+using ParseResult = StringParser::ParseResult;
 extern "C"
-bool IrStringToBool(const char* s, int len, StringParser::ParseResult* result) {
+bool IrStringToBool(const char* s, int len, ParseResult* result) {
   return StringParser::StringToBool(s, len, result);
 }
 
-int8_t IrStringToInt8(const char* s, int len, StringParser::ParseResult* result) {
+extern "C"
+int8_t IrStringToInt8(const char* s, int len, ParseResult* result) {
   return StringParser::StringToInt<int8_t>(s, len, result);
 }
 
 extern "C"
-int16_t IrStringToInt16(const char* s, int len, StringParser::ParseResult* result) {
+int16_t IrStringToInt16(const char* s, int len, ParseResult* result) {
   return StringParser::StringToInt<int16_t>(s, len, result);
 }
 
 extern "C"
-int32_t IrStringToInt32(const char* s, int len, StringParser::ParseResult* result) {
+int32_t IrStringToInt32(const char* s, int len, ParseResult* result) {
   return StringParser::StringToInt<int32_t>(s, len, result);
 }
 
 extern "C"
-int64_t IrStringToInt64(const char* s, int len, StringParser::ParseResult* result) {
+int64_t IrStringToInt64(const char* s, int len, ParseResult* result) {
   return StringParser::StringToInt<int64_t>(s, len, result);
 }
 
 extern "C"
-float IrStringToFloat(const char* s, int len, StringParser::ParseResult* result) {
+float IrStringToFloat(const char* s, int len, ParseResult* result) {
   return StringParser::StringToFloat<float>(s, len, result);
 }
 
 extern "C"
-double IrStringToDouble(const char* s, int len, StringParser::ParseResult* result) {
+double IrStringToDouble(const char* s, int len, ParseResult* result) {
   return StringParser::StringToFloat<double>(s, len, result);
 }
 
 extern "C"
-TimestampValue IrStringToTimestamp(const char* s, int len,
-    StringParser::ParseResult* result) {
-  return StringParser::StringToTimestamp(s, len, result);
+void IrStringToTimestamp(TimestampValue* out, const char* s, int len,
+    ParseResult* result) {
+  *out = StringParser::StringToTimestamp(s, len, result);
+}
+
+extern "C"
+Decimal4Value IrStringToDecimal4(const char* s, int len, int type_precision,
+    int type_scale, ParseResult* result)  {
+  auto ret = StringToDecimal4(s, len, type_precision, type_scale, result);
+  if (*result != ParseResult::PARSE_SUCCESS) *result = ParseResult::PARSE_FAILURE;
+  return ret;
+}
+
+extern "C"
+Decimal8Value IrStringToDecimal8(const char* s, int len, int type_precision,
+    int type_scale, ParseResult* result)  {
+  auto ret = StringToDecimal8(s, len, type_precision, type_scale, result);
+  if (*result != ParseResult::PARSE_SUCCESS) *result = ParseResult::PARSE_FAILURE;
+  return ret;
+}
+
+extern "C"
+Decimal16Value IrStringToDecimal16(const char* s, int len, int type_precision,
+    int type_scale, ParseResult* result)  {
+  auto ret = StringToDecimal16(s, len, type_precision, type_scale, result);
+  if (*result != ParseResult::PARSE_SUCCESS) *result = ParseResult::PARSE_FAILURE;
+  return ret;
 }
 
 extern "C"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/679ebc1a/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 0ef62ab..e81a72a 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -308,14 +308,6 @@ Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node,
   SCOPED_TIMER(codegen->codegen_timer());
   RuntimeState* state = node->runtime_state();
 
-  // TODO: Timestamp is not yet supported
-  for (int i = 0; i < node->materialized_slots().size(); ++i) {
-    SlotDescriptor* slot_desc = node->materialized_slots()[i];
-    if (slot_desc->type().type == TYPE_DECIMAL) {
-      return Status::Expected("Decimal not yet supported for codegen.");
-    }
-  }
-
   // Cast away const-ness.  The codegen only sets the cached typed llvm struct.
   TupleDescriptor* tuple_desc = const_cast<TupleDescriptor*>(node->tuple_desc());
   vector<Function*> slot_fns;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/679ebc1a/be/src/exec/text-converter.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/text-converter.cc b/be/src/exec/text-converter.cc
index 431a11b..0cad05c 100644
--- a/be/src/exec/text-converter.cc
+++ b/be/src/exec/text-converter.cc
@@ -229,6 +229,23 @@ Status TextConverter::CodegenWriteSlot(LlvmCodeGen* codegen,
       case TYPE_TIMESTAMP:
         parse_fn_enum = IRFunction::STRING_TO_TIMESTAMP;
         break;
+      case TYPE_DECIMAL:
+        switch (slot_desc->slot_size()) {
+          case 4:
+            parse_fn_enum = IRFunction::STRING_TO_DECIMAL4;
+            break;
+          case 8:
+            parse_fn_enum = IRFunction::STRING_TO_DECIMAL8;
+            break;
+          case 16:
+            parse_fn_enum = IRFunction::STRING_TO_DECIMAL16;
+            break;
+          default:
+            DCHECK(false);
+            return Status("TextConverter::CodegenWriteSlot(): "
+                "Decimal slots can't be this size.");
+        }
+        break;
       default:
         DCHECK(false);
         return Status("TextConverter::CodegenWriteSlot(): Codegen'd parser NYI for the"
@@ -244,17 +261,22 @@ Status TextConverter::CodegenWriteSlot(LlvmCodeGen* codegen,
     LlvmCodeGen::NamedVariable parse_result("parse_result", codegen->GetType(TYPE_INT));
     Value* parse_result_ptr = codegen->CreateEntryBlockAlloca(*fn, parse_result);
 
-    Value* parse_return;
+    CallInst* parse_return;
     // Call Impala's StringTo* function
-    if (parse_fn->arg_size() == 3) {
-      parse_return = builder.CreateCall(parse_fn, {args[1], args[2], parse_result_ptr});
-    } else {
-      DCHECK(parse_fn->arg_size() == 4);
+    // Function implementations in exec/hdfs-scanner-ir.cc
+    if (slot_desc->type().type == TYPE_DECIMAL) {
+      // Special case for decimal since it has additional precision/scale parameters
+      parse_return = builder.CreateCall(parse_fn, {args[1], args[2],
+          codegen->GetIntConstant(TYPE_INT, slot_desc->type().precision),
+          codegen->GetIntConstant(TYPE_INT, slot_desc->type().scale), parse_result_ptr});
+    } else if (slot_desc->type().type == TYPE_TIMESTAMP) {
       // If the return value is large (more than 16 bytes in our toolchain) the first
       // parameter would be a pointer to value parsed and the return value of callee
       // should be ignored
-      builder.CreateCall(parse_fn, {slot, args[1], args[2], parse_result_ptr});
-      parse_return = nullptr;
+      parse_return =
+          builder.CreateCall(parse_fn, {slot, args[1], args[2], parse_result_ptr});
+    } else {
+      parse_return = builder.CreateCall(parse_fn, {args[1], args[2], parse_result_ptr});
     }
     Value* parse_result_val = builder.CreateLoad(parse_result_ptr, "parse_result");
     Value* failed_value = codegen->GetIntConstant(TYPE_INT, StringParser::PARSE_FAILURE);
@@ -274,7 +296,15 @@ Status TextConverter::CodegenWriteSlot(LlvmCodeGen* codegen,
     // Parse succeeded
     builder.SetInsertPoint(parse_success_block);
     // If the parsed value is in parse_return, move it into slot
-    if (parse_fn->arg_size() == 3) builder.CreateStore(parse_return, slot);
+    if (slot_desc->type().type == TYPE_DECIMAL) {
+      // For Decimal values, the return type generated by Clang is struct type rather than
+      // integer so casting is necessary
+      Value* cast_slot = builder.CreateBitCast(slot,
+          parse_return->getType()->getPointerTo());
+      builder.CreateStore(parse_return, cast_slot);
+    } else if (slot_desc->type().type != TYPE_TIMESTAMP) {
+      builder.CreateStore(parse_return, slot);
+    }
     builder.CreateRet(codegen->true_value());
 
     // Parse failed, set slot to null and return false

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/679ebc1a/be/src/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index c38be86..3d92da7 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -70,6 +70,7 @@ add_library(Util
   redactor.cc
   runtime-profile.cc
   simple-logger.cc
+  string-parser.cc
   symbols-util.cc
   static-asserts.cc
   summary-util.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/679ebc1a/be/src/util/string-parser.cc
----------------------------------------------------------------------
diff --git a/be/src/util/string-parser.cc b/be/src/util/string-parser.cc
new file mode 100644
index 0000000..f59c6b88
--- /dev/null
+++ b/be/src/util/string-parser.cc
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/string-parser.h"
+
+namespace impala {
+
+using ParseResult = StringParser::ParseResult;
+Decimal4Value StringToDecimal4(const char* s, int len, int type_precision,
+    int type_scale, StringParser::ParseResult* result) {
+  return StringParser::StringToDecimal<int32_t>(s, len, type_precision,
+      type_scale, result);
+}
+
+Decimal8Value StringToDecimal8(const char* s, int len, int type_precision,
+    int type_scale, StringParser::ParseResult* result) {
+  return StringParser::StringToDecimal<int64_t>(s, len, type_precision,
+      type_scale, result);
+}
+
+Decimal16Value StringToDecimal16(const char* s, int len, int type_precision,
+    int type_scale, StringParser::ParseResult* result) {
+  return StringParser::StringToDecimal<int128_t>(s, len, type_precision,
+      type_scale, result);
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/679ebc1a/be/src/util/string-parser.h
----------------------------------------------------------------------
diff --git a/be/src/util/string-parser.h b/be/src/util/string-parser.h
index d6b94c7..10c3fc7 100644
--- a/be/src/util/string-parser.h
+++ b/be/src/util/string-parser.h
@@ -598,6 +598,15 @@ inline int StringParser::StringParseTraits<int32_t>::max_ascii_len() { return 10
 template<>
 inline int StringParser::StringParseTraits<int64_t>::max_ascii_len() { return 19; }
 
-}
+// These functions are too large to benefit from inlining.
+Decimal4Value StringToDecimal4(const char* s, int len, int type_precision,
+    int type_scale, StringParser::ParseResult* result);
+
+Decimal8Value StringToDecimal8(const char* s, int len, int type_precision,
+    int type_scale, StringParser::ParseResult* result);
 
+Decimal16Value StringToDecimal16(const char* s, int len, int type_precision,
+    int type_scale, StringParser::ParseResult* result);
+
+}
 #endif


[4/6] incubator-impala git commit: IMPALA-4990: fix run_tests.py --update_results

Posted by ta...@apache.org.
IMPALA-4990: fix run_tests.py --update_results

Seems to have broken with some recent commits.

Change-Id: I9c22e197662228158d7935ebfb12d9b3691eb499
Reviewed-on: http://gerrit.cloudera.org:8080/6151
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5f323124
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5f323124
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5f323124

Branch: refs/heads/master
Commit: 5f323124ae4e68e4022f0d7be5d0872988124eb0
Parents: 57dae5e
Author: Dan Hecht <dh...@cloudera.com>
Authored: Fri Feb 24 15:07:24 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Aug 23 04:49:13 2017 +0000

----------------------------------------------------------------------
 tests/common/test_result_verifier.py | 2 +-
 tests/util/test_file_parser.py       | 5 +++--
 2 files changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5f323124/tests/common/test_result_verifier.py
----------------------------------------------------------------------
diff --git a/tests/common/test_result_verifier.py b/tests/common/test_result_verifier.py
index 7e929f1..a22e0d4 100644
--- a/tests/common/test_result_verifier.py
+++ b/tests/common/test_result_verifier.py
@@ -404,7 +404,7 @@ def verify_raw_results(test_section, exec_result, file_format, update_section=Fa
     VERIFIER_MAP[verifier](expected, actual)
   except AssertionError:
     if update_section:
-      test_section[results_section] = join_section_lines(actual.result_list)
+      test_section[result_section] = join_section_lines(actual.result_list)
     else:
       raise
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5f323124/tests/util/test_file_parser.py
----------------------------------------------------------------------
diff --git a/tests/util/test_file_parser.py b/tests/util/test_file_parser.py
index 9850b18..89149ab 100644
--- a/tests/util/test_file_parser.py
+++ b/tests/util/test_file_parser.py
@@ -291,7 +291,8 @@ def write_test_file(test_file_name, test_file_sections, encoding=None):
         if section_name == 'RESULTS' and test_case.get('VERIFIER'):
           full_section_name = '%s: %s' % (section_name, test_case['VERIFIER'])
         test_file_text.append("%s %s" % (SUBSECTION_DELIMITER, full_section_name))
-        if test_case[section_name].strip():
-          test_file_text.append(test_case[section_name])
+        section_value = ''.join(test_case[section_name])
+        if section_value.strip():
+          test_file_text.append(section_value)
     test_file_text.append(SECTION_DELIMITER)
     test_file.write(('\n').join(test_file_text))