You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/07/05 20:37:37 UTC

[5/8] incubator-impala git commit: IMPALA-3680: Cleanup the scan range state after failed hdfs cache reads

IMPALA-3680: Cleanup the scan range state after failed hdfs cache reads

Currently we don't reset the file read offset if ZCR fails. Due to
this, when we switch to the normal read path, we hit the eosr of
the scan-range even before reading the expected data length. If both
the ReadFromCache() and ReadRange() calls fail without reading any
data, we end up creating a whole list of scan-ranges, each with size
1KB (DEFAULT_READ_PAST_SIZE) assuming we are reading past the scan
range. This gives a huge performance hit. This patch just calls
ScanRange::Close() after the failed cache reads to clean up the
file system state so that the re-reads start from beginning of
the scan range.

This was hit as a part of debugging IMPALA-3679, where the queries
on 1gb cached data were running ~20x slower compared to non-cached
runs.

Change-Id: I0a9ea19dd8571b01d2cd5b87da1c259219f6297a
Reviewed-on: http://gerrit.cloudera.org:8080/3313
Reviewed-by: Michael Brown <mi...@cloudera.com>
Tested-by: Bharath Vissapragada <bh...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d1975166
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d1975166
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d1975166

Branch: refs/heads/master
Commit: d19751669acb5b5886792d8606d5dca7ee5b30a8
Parents: 08e8de7
Author: Bharath Vissapragada <bh...@cloudera.com>
Authored: Mon Jun 6 03:46:11 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Tue Jul 5 13:37:26 2016 -0700

----------------------------------------------------------------------
 be/src/runtime/disk-io-mgr-scan-range.cc        | 11 ++-
 .../common/etc/hadoop/conf/hdfs-site.xml.tmpl   |  2 +-
 tests/query_test/test_hdfs_caching.py           | 75 +++++++++++++++++++-
 3 files changed, 83 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d1975166/be/src/runtime/disk-io-mgr-scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc
index 25399bc..9a7e39a 100644
--- a/be/src/runtime/disk-io-mgr-scan-range.cc
+++ b/be/src/runtime/disk-io-mgr-scan-range.cc
@@ -423,9 +423,14 @@ Status DiskIoMgr::ScanRange::ReadFromCache(bool* read_succeeded) {
     DCHECK(cached_buffer_ == NULL);
     cached_buffer_ = hadoopReadZero(hdfs_file_->file(),
         io_mgr_->cached_read_options_, len());
-
-    // Data was not cached, caller will fall back to normal read path.
-    if (cached_buffer_ == NULL) return Status::OK();
+  }
+  // Data was not cached, caller will fall back to normal read path.
+  if (cached_buffer_ == NULL) {
+    VLOG_QUERY << "Cache read failed for scan range: " << DebugString()
+        << ". Switching to disk read path.";
+    // Clean up the scan range state before re-issuing it.
+    Close();
+    return Status::OK();
   }
 
   // Cached read succeeded.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d1975166/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl
----------------------------------------------------------------------
diff --git a/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl b/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl
index 740bebd..575fc0a 100644
--- a/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl
+++ b/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl
@@ -82,7 +82,7 @@
     <value>134217728</value>
   </property>
 
-  <!-- Set the max cached memory to ~64kb. This must be less than ulimit -1 -->
+  <!-- Set the max cached memory to ~64kb. This must be less than ulimit -l -->
   <property>
     <name>dfs.datanode.max.locked.memory</name>
     <value>64000</value>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d1975166/tests/query_test/test_hdfs_caching.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_hdfs_caching.py b/tests/query_test/test_hdfs_caching.py
index 10a9e2a..e275cbd 100644
--- a/tests/query_test/test_hdfs_caching.py
+++ b/tests/query_test/test_hdfs_caching.py
@@ -5,13 +5,15 @@ import logging
 import os
 import pytest
 from copy import copy
-from subprocess import call
+from subprocess import check_call, call
+from time import time
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.impala_test_suite import *
 from tests.common.test_vector import *
 from tests.common.impala_cluster import ImpalaCluster
 from tests.common.test_dimensions import create_exec_option_dimension
 from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal
+from tests.util.filesystem_utils import get_fs_path
 from tests.util.shell_util import exec_process
 
 # End to end test that hdfs caching is working.
@@ -87,6 +89,67 @@ class TestHdfsCaching(ImpalaTestSuite):
       result = self.execute_query(query_string)
       assert(len(result.data) == 2)
 
+# A separate class has been created for "test_hdfs_caching_fallback_path" to make it
+# run as a part of exhaustive tests which require the workload to be 'functional-query'.
+# TODO: Move this to TestHdfsCaching once we make exhaustive tests run for other workloads
+@SkipIfS3.caching
+@SkipIfIsilon.caching
+@SkipIfLocal.caching
+class TestHdfsCachingFallbackPath(ImpalaTestSuite):
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @SkipIfS3.hdfs_encryption
+  @SkipIfIsilon.hdfs_encryption
+  @SkipIfLocal.hdfs_encryption
+  def test_hdfs_caching_fallback_path(self, vector, unique_database, testid_checksum):
+    """ This tests the code path of the query execution where the hdfs cache read fails
+    and the execution falls back to the normal read path. To reproduce this situation we
+    rely on IMPALA-3679, where zcrs are not supported with encryption zones. This makes
+    sure ReadFromCache() fails and falls back to ReadRange() to read the scan range."""
+
+    if self.exploration_strategy() != 'exhaustive' or\
+        vector.get_value('table_format').file_format != 'text':
+      pytest.skip()
+
+    # Create a new encryption zone and copy the tpch.nation table data into it.
+    encrypted_table_dir = get_fs_path("/test-warehouse/" + testid_checksum)
+    create_query_sql = "CREATE EXTERNAL TABLE %s.cached_nation like tpch.nation "\
+        "LOCATION '%s'" % (unique_database, encrypted_table_dir)
+    check_call(["hdfs", "dfs", "-mkdir", encrypted_table_dir], shell=False)
+    check_call(["hdfs", "crypto", "-createZone", "-keyName", "testKey1", "-path",\
+        encrypted_table_dir], shell=False)
+    check_call(["hdfs", "dfs", "-cp", get_fs_path("/test-warehouse/tpch.nation/*.tbl"),\
+        encrypted_table_dir], shell=False)
+    # Reduce the scan range size to force the query to have multiple scan ranges.
+    exec_options = vector.get_value('exec_option')
+    exec_options['max_scan_range_length'] = 1024
+    try:
+      self.execute_query_expect_success(self.client, create_query_sql)
+      # Cache the table data
+      self.execute_query_expect_success(self.client, "ALTER TABLE %s.cached_nation set "
+         "cached in 'testPool'" % unique_database)
+      # Wait till the whole path is cached. We set a deadline of 20 seconds for the path
+      # to be cached to make sure this doesn't loop forever in case of caching errors.
+      caching_deadline = time.time() + 20
+      while not is_path_fully_cached(encrypted_table_dir):
+        if time.time() > caching_deadline:
+          pytest.fail("Timed out caching path: " + encrypted_table_dir)
+        time.sleep(2)
+      self.execute_query_expect_success(self.client, "invalidate metadata "
+          "%s.cached_nation" % unique_database);
+      result = self.execute_query_expect_success(self.client, "select count(*) from "
+          "%s.cached_nation" % unique_database, exec_options)
+      assert(len(result.data) == 1)
+      assert(result.data[0] == '25')
+    except Exception as e:
+      pytest.fail("Failure in test_hdfs_caching_fallback_path: " + str(e))
+    finally:
+      check_call(["hdfs", "dfs", "-rm", "-r", "-f", "-skipTrash", encrypted_table_dir],\
+          shell=False)
+
+
 @SkipIfS3.caching
 @SkipIfIsilon.caching
 @SkipIfLocal.caching
@@ -180,6 +243,16 @@ def drop_cache_directives_for_path(path):
   assert rc == 0, \
       "Error removing cache directive for path %s (%s, %s)" % (path, stdout, stderr)
 
+def is_path_fully_cached(path):
+  """Returns true if all the bytes of the path are cached, false otherwise"""
+  rc, stdout, stderr = exec_process("hdfs cacheadmin -listDirectives -stats -path %s" % path)
+  assert rc == 0
+  caching_stats = stdout.strip("\n").split("\n")[-1].split()
+  # Compare BYTES_NEEDED and BYTES_CACHED, the output format is as follows
+  # "ID POOL REPL EXPIRY PATH BYTES_NEEDED BYTES_CACHED FILES_NEEDED FILES_CACHED"
+  return len(caching_stats) > 0 and caching_stats[5] == caching_stats[6]
+
+
 def get_cache_directive_for_path(path):
   rc, stdout, stderr = exec_process("hdfs cacheadmin -listDirectives -path %s" % path)
   assert rc == 0