You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/05/03 21:26:10 UTC

[impala] branch master updated (99e1a39 -> 3fb3657)

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 99e1a39  Bump CDP_BUILD_NUMBER to 1056671
     new f5e89d6  IMPALA-8477: [DOCS] SHOW GRANT GROUP for Ranger authorization
     new 2ece4c9  IMPALA-8341: Data cache for remote reads
     new 84addd2  IMPALA-8485: Authorization policy file clean up
     new 3fb3657  IMPALA-5351: Support storing column comment of kudu table

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/hdfs-scan-node-base.cc                 |  17 +
 be/src/exec/hdfs-scan-node-base.h                  |   7 +
 be/src/runtime/io/CMakeLists.txt                   |   2 +
 be/src/runtime/io/data-cache-test.cc               | 489 ++++++++++++++
 be/src/runtime/io/data-cache.cc                    | 707 +++++++++++++++++++++
 be/src/runtime/io/data-cache.h                     | 354 +++++++++++
 be/src/runtime/io/disk-io-mgr.cc                   |  13 +
 be/src/runtime/io/disk-io-mgr.h                    |  16 +
 be/src/runtime/io/hdfs-file-reader.cc              |  86 ++-
 be/src/runtime/io/hdfs-file-reader.h               |  38 +-
 be/src/runtime/io/request-context.h                |  27 +
 be/src/util/filesystem-util-test.cc                |  36 ++
 be/src/util/filesystem-util.cc                     | 133 +++-
 be/src/util/filesystem-util.h                      |  37 +-
 be/src/util/impalad-metrics.cc                     |  22 +-
 be/src/util/impalad-metrics.h                      |  18 +-
 bin/create-test-configuration.sh                   |   1 -
 bin/start-impala-cluster.py                        |  19 +-
 common/thrift/metrics.json                         |  40 ++
 docs/topics/impala_show.xml                        |  76 +--
 fe/.gitignore                                      |   3 -
 .../impala/analysis/AlterTableAlterColStmt.java    |   4 -
 .../java/org/apache/impala/catalog/KuduColumn.java |   7 +-
 .../apache/impala/service/CatalogOpExecutor.java   |  22 +-
 .../impala/service/KuduCatalogOpExecutor.java      |  12 +-
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |  15 +-
 .../queries/QueryTest/data-cache.test              |  49 ++
 .../queries/QueryTest/kudu_describe.test           |   2 +-
 tests/authorization/test_authorization.py          |   6 +-
 tests/common/custom_cluster_test_suite.py          |   4 +-
 tests/common/impala_test_suite.py                  |  24 +
 tests/custom_cluster/test_data_cache.py            |  43 ++
 tests/custom_cluster/test_krpc_metrics.py          |  24 -
 tests/metadata/test_ddl.py                         |  42 ++
 tests/metadata/test_ddl_base.py                    |   2 +-
 tests/query_test/test_kudu.py                      |  15 +-
 36 files changed, 2287 insertions(+), 125 deletions(-)
 create mode 100644 be/src/runtime/io/data-cache-test.cc
 create mode 100644 be/src/runtime/io/data-cache.cc
 create mode 100644 be/src/runtime/io/data-cache.h
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/data-cache.test
 create mode 100644 tests/custom_cluster/test_data_cache.py


[impala] 04/04: IMPALA-5351: Support storing column comment of kudu table

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3fb36570ae0c7329cdbe3640515bc3e0cb066c81
Author: helifu <hz...@corp.netease.com>
AuthorDate: Tue Apr 9 21:22:14 2019 +0800

    IMPALA-5351: Support storing column comment of kudu table
    
    This patch intends to support storing column comment of kudu table
    on impala side.
    
    Belows tests passed:
    1) creata kudu-table with column comment;
    2) alter kudu-table with (add/alter[delete] column comment);
    3) show create kudu table;
    4) describe kudu-table;
    5) invalidate metadata;
    6) comment on column is { '' | null | 'comment' }
    
    Change-Id: Ifb3b37eed364f12bdb3c1d7ef5be128f1475936c
    Reviewed-on: http://gerrit.cloudera.org:8080/12977
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/analysis/AlterTableAlterColStmt.java    |  4 ---
 .../java/org/apache/impala/catalog/KuduColumn.java |  7 ++--
 .../apache/impala/service/CatalogOpExecutor.java   | 22 ++++++++----
 .../impala/service/KuduCatalogOpExecutor.java      | 12 +++++--
 .../org/apache/impala/analysis/AnalyzeDDLTest.java | 15 ++++----
 .../queries/QueryTest/kudu_describe.test           |  2 +-
 tests/metadata/test_ddl.py                         | 42 ++++++++++++++++++++++
 tests/metadata/test_ddl_base.py                    |  2 +-
 tests/query_test/test_kudu.py                      | 15 ++++++--
 9 files changed, 96 insertions(+), 25 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableAlterColStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableAlterColStmt.java
index 6b43bfd..5e6b5cd 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableAlterColStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableAlterColStmt.java
@@ -165,10 +165,6 @@ public class AlterTableAlterColStmt extends AlterTableStmt {
             "Cannot %s default value for primary key column '%s'",
             isDropDefault_ ? "drop" : "set", colName_));
       }
-      if (newColDef_.getComment() != null) {
-        // IMPALA-5351
-        throw new AnalysisException("Kudu does not support column comments.");
-      }
       if (newColDef_.isPrimaryKey()) {
         throw new AnalysisException(
             "Altering a column to be a primary key is not supported.");
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java b/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java
index a4b8e5f..246b0e3 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java
@@ -85,10 +85,11 @@ public class KuduColumn extends Column {
       }
       Preconditions.checkNotNull(defaultValueExpr);
     }
+    String comment = !colSchema.getComment().isEmpty() ? colSchema.getComment() : null;
     return new KuduColumn(colSchema.getName(), type, colSchema.isKey(),
         colSchema.isNullable(), colSchema.getEncoding(),
         colSchema.getCompressionAlgorithm(), defaultValueExpr,
-        colSchema.getDesiredBlockSize(), null, position);
+        colSchema.getDesiredBlockSize(), comment, position);
   }
 
   public static KuduColumn fromThrift(TColumn column, int position)
@@ -111,8 +112,10 @@ public class KuduColumn extends Column {
     }
     int blockSize = 0;
     if (column.isSetBlock_size()) blockSize = column.getBlock_size();
+    String comment = (column.isSetComment() && !column.getComment().isEmpty()) ?
+        column.getComment() : null;
     return new KuduColumn(column.getKudu_column_name(), columnType, column.isIs_key(),
-        column.isIs_nullable(), encoding, compression, defaultValue, blockSize, null,
+        column.isIs_nullable(), encoding, compression, defaultValue, blockSize, comment,
         position);
   }
 
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 3bf2434..80859fa 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -4021,15 +4021,23 @@ public class CatalogOpExecutor {
     try {
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
       catalog_.getLock().writeLock().unlock();
-      org.apache.hadoop.hive.metastore.api.Table msTbl =
-          tbl.getMetaStoreTable().deepCopy();
-      if (!updateColumnComment(msTbl.getSd().getColsIterator(), columnName, comment)) {
-        if (!updateColumnComment(msTbl.getPartitionKeysIterator(), columnName, comment)) {
-          throw new ColumnNotFoundException(String.format(
-              "Column name %s not found in table %s.", columnName, tbl.getFullName()));
+      if (tbl instanceof KuduTable) {
+        TColumn new_col = new TColumn(columnName,
+            tbl.getColumn(columnName).getType().toThrift());
+        new_col.setComment(comment != null ? comment : "");
+        KuduCatalogOpExecutor.alterColumn((KuduTable) tbl, columnName, new_col);
+      } else {
+        org.apache.hadoop.hive.metastore.api.Table msTbl =
+            tbl.getMetaStoreTable().deepCopy();
+        if (!updateColumnComment(msTbl.getSd().getColsIterator(), columnName, comment)) {
+          if (!updateColumnComment(msTbl.getPartitionKeysIterator(), columnName,
+              comment)) {
+            throw new ColumnNotFoundException(String.format(
+                "Column name %s not found in table %s.", columnName, tbl.getFullName()));
+          }
         }
+        applyAlterTable(msTbl, true);
       }
-      applyAlterTable(msTbl, true);
       loadTableMetadata(tbl, newCatalogVersion, false, true, null);
       addTableToCatalogUpdate(tbl, response.result);
       addSummary(response, "Column has been altered.");
diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index a7fb142..f4477b4 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -131,6 +131,9 @@ public class KuduCatalogOpExecutor {
       csb.typeAttributes(
           DecimalUtil.typeAttributes(type.getPrecision(), type.getDecimalDigits()));
     }
+    if (column.isSetComment() && !column.getComment().isEmpty()) {
+      csb.comment(column.getComment());
+    }
     return csb.build();
   }
 
@@ -292,7 +295,10 @@ public class KuduCatalogOpExecutor {
         }
         Type type =
             KuduUtil.toImpalaType(colSchema.getType(), colSchema.getTypeAttributes());
-        cols.add(new FieldSchema(colSchema.getName(), type.toSql().toLowerCase(), null));
+        String comment =
+            !colSchema.getComment().isEmpty() ? colSchema.getComment() : null;
+        cols.add(new FieldSchema(colSchema.getName(), type.toSql().toLowerCase(),
+            comment));
       }
     } catch (Exception e) {
       throw new ImpalaRuntimeException(String.format("Error loading schema of table " +
@@ -459,7 +465,6 @@ public class KuduCatalogOpExecutor {
       throws ImpalaRuntimeException {
     Preconditions.checkState(!Strings.isNullOrEmpty(colName));
     Preconditions.checkNotNull(newCol);
-    Preconditions.checkState(!newCol.isSetComment());
     Preconditions.checkState(!newCol.isIs_key());
     Preconditions.checkState(!newCol.isSetIs_nullable());
     if (LOG.isTraceEnabled()) {
@@ -495,6 +500,9 @@ public class KuduCatalogOpExecutor {
     if (!newColName.toLowerCase().equals(colName.toLowerCase())) {
       alterTableOptions.renameColumn(kuduColName, newColName);
     }
+    if (newCol.isSetComment()) {
+      alterTableOptions.changeComment(kuduColName, newCol.getComment());
+    }
 
     String errMsg = String.format(
         "Error altering column %s in Kudu table %s", colName, tbl.getName());
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 64c7a99..02d0b84 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -1479,6 +1479,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         "compression LZ4 encoding RLE");
     AnalyzesOk("alter table functional.alltypes alter int_col set comment 'a'");
     AnalyzesOk("alter table functional_kudu.alltypes alter int_col drop default");
+    AnalyzesOk("alter table functional_kudu.alltypes alter int_col set comment 'a'");
 
     AnalysisError("alter table functional_kudu.alltypes alter id set default 0",
         "Cannot set default value for primary key column 'id'");
@@ -1493,8 +1494,6 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         "Altering a column to be a primary key is not supported.");
     AnalysisError("alter table functional_kudu.alltypes alter int_col set not null",
         "Altering the nullability of a column is not supported.");
-    AnalysisError("alter table functional_kudu.alltypes alter int_col set comment 'a'",
-        "Kudu does not support column comments.");
     AnalysisError("alter table functional.alltypes alter int_col set compression lz4",
         "Unsupported column options for non-Kudu table: 'int_col INT COMPRESSION LZ4'");
     AnalysisError("alter table functional.alltypes alter int_col drop default",
@@ -2604,14 +2603,13 @@ public class AnalyzeDDLTest extends FrontendTestBase {
 
     // ALTER TABLE CHANGE COLUMN on Kudu tables
     AnalyzesOk("alter table functional_kudu.testtbl change column name new_name string");
+    AnalyzesOk("alter table functional_kudu.testtbl change column zip " +
+        "zip int comment 'comment'");
     // Unsupported column options
     AnalysisError("alter table functional_kudu.testtbl change column zip zip_code int " +
         "encoding rle compression lz4 default 90000", "Unsupported column options in " +
         "ALTER TABLE CHANGE COLUMN statement: 'zip_code INT ENCODING RLE COMPRESSION " +
         "LZ4 DEFAULT 90000'. Use ALTER TABLE ALTER COLUMN instead.");
-    AnalysisError(
-        "alter table functional_kudu.testtbl change column zip zip int comment 'comment'",
-        "Kudu does not support column comments.");
     // Changing the column type is not supported for Kudu tables
     AnalysisError("alter table functional_kudu.testtbl change column zip zip bigint",
         "Cannot change the type of a Kudu column using an ALTER TABLE CHANGE COLUMN " +
@@ -3019,6 +3017,10 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         "ts timestamp not null default '2009-1 foo') " +
         "partition by hash(id) partitions 3 stored as kudu",
         "String '2009-1 foo' cannot be cast to a TIMESTAMP literal.");
+
+    // Test column comments.
+    AnalyzesOk("create table tab (x int comment 'x', y int comment 'y', " +
+        "primary key (x, y)) stored as kudu");
   }
 
   @Test
@@ -4325,7 +4327,8 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         new Pair<>("functional.alltypes.id", createAnalysisCtx()),
         new Pair<>("alltypes.id", createAnalysisCtx("functional")),
         new Pair<>("functional.alltypes_view.id", createAnalysisCtx()),
-        new Pair<>("alltypes_view.id", createAnalysisCtx("functional"))}) {
+        new Pair<>("alltypes_view.id", createAnalysisCtx("functional")),
+        new Pair<>("functional_kudu.alltypes.id", createAnalysisCtx())}) {
       AnalyzesOk(String.format("comment on column %s is 'comment'", pair.first),
           pair.second);
       AnalyzesOk(String.format("comment on column %s is ''", pair.first), pair.second);
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_describe.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_describe.test
index daaa18f..8b8e80f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_describe.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_describe.test
@@ -39,7 +39,7 @@ NAME,TYPE,COMMENT,PRIMARY_KEY,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_
 'pk1','int','','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
 'pk2','int','','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
 'pk3','string','','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'c1','string','','false','true','abc','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'c1','string','testing','false','true','abc','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
 'c2','int','','false','false','100','PLAIN_ENCODING','SNAPPY','0'
 'c3','int','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','8388608'
 ---- TYPES
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index c8871c5..05d8c01 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -728,6 +728,48 @@ class TestDdlStatements(TestDdlBase):
       self.client, "show create table {0}".format(orc_table))
     assert any("ORC" in x for x in result.data)
 
+  def test_kudu_column_comment(self, vector, unique_database):
+    table = "{0}.kudu_table0".format(unique_database)
+    self.client.execute("create table {0}(x int comment 'x' primary key) \
+                        stored as kudu".format(table))
+    comment = self._get_column_comment(table, 'x')
+    assert "x" == comment
+
+    table = "{0}.kudu_table".format(unique_database)
+    self.client.execute("create table {0}(i int primary key) stored as kudu"
+                        .format(table))
+    comment = self._get_column_comment(table, 'i')
+    assert "" == comment
+
+    self.client.execute("comment on column {0}.i is 'comment1'".format(table))
+    comment = self._get_column_comment(table, 'i')
+    assert "comment1" == comment
+
+    self.client.execute("comment on column {0}.i is ''".format(table))
+    comment = self._get_column_comment(table, 'i')
+    assert "" == comment
+
+    self.client.execute("comment on column {0}.i is 'comment2'".format(table))
+    comment = self._get_column_comment(table, 'i')
+    assert "comment2" == comment
+
+    self.client.execute("comment on column {0}.i is null".format(table))
+    comment = self._get_column_comment(table, 'i')
+    assert "" == comment
+
+    self.client.execute("alter table {0} alter column i set comment 'comment3'"
+                        .format(table))
+    comment = self._get_column_comment(table, 'i')
+    assert "comment3" == comment
+
+    self.client.execute("alter table {0} alter column i set comment ''".format(table))
+    comment = self._get_column_comment(table, 'i')
+    assert "" == comment
+
+    self.client.execute("alter table {0} add columns (j int comment 'comment4')"
+                        .format(table))
+    comment = self._get_column_comment(table, 'j')
+    assert "comment4" == comment
 
 # IMPALA-2002: Tests repeated adding/dropping of .jar and .so in the lib cache.
 class TestLibCache(TestDdlBase):
diff --git a/tests/metadata/test_ddl_base.py b/tests/metadata/test_ddl_base.py
index a27aa1c..83399b0 100644
--- a/tests/metadata/test_ddl_base.py
+++ b/tests/metadata/test_ddl_base.py
@@ -119,7 +119,7 @@ class TestDdlBase(ImpalaTestSuite):
     comments = dict()
     for row in result.data:
       cols = row.split('\t')
-      if len(cols) == 3:
+      if len(cols) <= 9:
         comments[cols[0].rstrip()] = cols[2].rstrip()
     return comments.get(col_name)
 
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index 67ba8e2..776486c 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -776,6 +776,7 @@ class TestCreateExternalTable(KuduTestSuite):
         kudu_client.delete_table(table_name)
 
 class TestShowCreateTable(KuduTestSuite):
+  column_properties = "ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION"
 
   def assert_show_create_equals(self, cursor, create_sql, show_create_sql):
     """Executes 'create_sql' to create a table, then runs "SHOW CREATE TABLE" and checks
@@ -790,7 +791,6 @@ class TestShowCreateTable(KuduTestSuite):
         textwrap.dedent(show_create_sql.format(**format_args)).strip()
 
   def test_primary_key_and_distribution(self, cursor):
-    # TODO: Add test cases with column comments once KUDU-1711 is fixed.
     # TODO: Add case with BLOCK_SIZE
     self.assert_show_create_equals(cursor,
         """
@@ -877,7 +877,18 @@ class TestShowCreateTable(KuduTestSuite):
         STORED AS KUDU
         TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format(
             db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))
-
+    self.assert_show_create_equals(cursor,
+        """
+        CREATE TABLE {table} (c INT COMMENT 'Ab 1@' PRIMARY KEY) STORED AS KUDU""",
+        """
+        CREATE TABLE {db}.{{table}} (
+          c INT NOT NULL {p} COMMENT 'Ab 1@',
+          PRIMARY KEY (c)
+        )
+        STORED AS KUDU
+        TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format(
+            db=cursor.conn.db_name, p=self.column_properties,
+            kudu_addr=KUDU_MASTER_HOSTS))
 
   def test_timestamp_default_value(self, cursor):
     create_sql_fmt = """


[impala] 03/04: IMPALA-8485: Authorization policy file clean up

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 84addd2a4b74454bd929d6b2ada0f501a2c6b0cb
Author: Austin Nobis <an...@cloudera.com>
AuthorDate: Thu May 2 23:53:48 2019 -0500

    IMPALA-8485: Authorization policy file clean up
    
    This patch cleans up references to the deprecated authorization_policy_file
    flag. The authz-policy.ini file is no longer created during the test config
    creation. The reference is also removed from the gitignore.
    
    Testing:
    - All FE tests were run
    - All authorization E2E tests were run
    - test_authorization.py E2E test was updated to no longer have
      references to the authz-policy.ini file.
    
    Change-Id: Ib1e90973cb3d5b243844d379e5cdcb2add4eec75
    Reviewed-on: http://gerrit.cloudera.org:8080/13222
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 bin/create-test-configuration.sh          | 1 -
 fe/.gitignore                             | 3 ---
 tests/authorization/test_authorization.py | 6 ++----
 3 files changed, 2 insertions(+), 8 deletions(-)

diff --git a/bin/create-test-configuration.sh b/bin/create-test-configuration.sh
index 936ab9e..208d4f8 100755
--- a/bin/create-test-configuration.sh
+++ b/bin/create-test-configuration.sh
@@ -196,7 +196,6 @@ fi
 
 generate_config log4j.properties.template log4j.properties
 generate_config hbase-site.xml.template hbase-site.xml
-generate_config authz-policy.ini.template authz-policy.ini
 
 $IMPALA_HOME/bin/generate_xml_config.py sentry-site.xml.py sentry-site.xml
 for SENTRY_VARIANT in oo oo_nogrant no_oo ; do
diff --git a/fe/.gitignore b/fe/.gitignore
index 08d2c7f..fbade49 100644
--- a/fe/.gitignore
+++ b/fe/.gitignore
@@ -28,9 +28,6 @@ src/test/resources/hbase-site.xml
 # Generated core-site.xml file
 src/test/resources/core-site.xml
 
-# Generated authorization policy file
-src/test/resources/authz-policy.ini
-
 # Generated minicluster config
 src/test/resources/minicluster-conf.xml
 
diff --git a/tests/authorization/test_authorization.py b/tests/authorization/test_authorization.py
index a7e7361..f8ff36f 100644
--- a/tests/authorization/test_authorization.py
+++ b/tests/authorization/test_authorization.py
@@ -41,7 +41,6 @@ from tests.common.file_utils import assert_file_in_dir_contains,\
 from tests.hs2.hs2_test_suite import operation_id_to_query_id
 from tests.util.filesystem_utils import WAREHOUSE
 
-AUTH_POLICY_FILE = "%s/authz-policy.ini" % WAREHOUSE
 SENTRY_CONFIG_DIR = os.getenv('IMPALA_HOME') + '/fe/src/test/resources/'
 SENTRY_CONFIG_FILE = SENTRY_CONFIG_DIR + 'sentry-site.xml'
 
@@ -230,11 +229,10 @@ class TestAuthorization(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--server_name=server1\
-        --authorization_policy_file=%s\
         --authorized_proxy_user_config=foo=bar\
         --authorized_proxy_group_config=foo=bar\
         --abort_on_failed_audit_event=false\
-        --audit_event_log_dir=%s" % (AUTH_POLICY_FILE, AUDIT_LOG_DIR))
+        --audit_event_log_dir=%s" % (AUDIT_LOG_DIR))
   def test_no_matching_user_and_group_impersonation(self):
     open_session_req = TCLIService.TOpenSessionReq()
     open_session_req.username = 'hue'
@@ -387,7 +385,7 @@ class TestAuthorization(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--server_name=server1\
-      --authorization_policy_file=%s" % (AUTH_POLICY_FILE),
+      --authorization_policy_file=ignored_file",
       impala_log_dir=tempfile.mkdtemp(prefix="test_deprecated_",
       dir=os.getenv("LOG_DIR")))
   def test_deprecated_flags(self):


[impala] 01/04: IMPALA-8477: [DOCS] SHOW GRANT GROUP for Ranger authorization

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f5e89d6239eb7dbfa0acedd1704bcd398a197f9f
Author: Alex Rodoni <ar...@cloudera.com>
AuthorDate: Thu May 2 16:05:38 2019 -0700

    IMPALA-8477: [DOCS] SHOW GRANT GROUP for Ranger authorization
    
    Change-Id: Iadf0d5c8b43809880f194e0bc810df06bfab2075
    Reviewed-on: http://gerrit.cloudera.org:8080/13220
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Austin Nobis <an...@cloudera.com>
    Reviewed-by: Fredy Wijaya <fw...@cloudera.com>
---
 docs/topics/impala_show.xml | 76 +++++++++++++++++++++++----------------------
 1 file changed, 39 insertions(+), 37 deletions(-)

diff --git a/docs/topics/impala_show.xml b/docs/topics/impala_show.xml
index c7e6e0e..86c280a 100644
--- a/docs/topics/impala_show.xml
+++ b/docs/topics/impala_show.xml
@@ -33,16 +33,10 @@ under the License.
   </prolog>
 
   <conbody>
-
-    <p>
-      <indexterm audience="hidden">SHOW statement</indexterm>
-      The <codeph>SHOW</codeph> statement is a flexible way to get information about different types of Impala
-      objects.
-    </p>
-
+    <p> The <codeph>SHOW</codeph> statement is a flexible way to get information
+      about different types of Impala objects. </p>
     <p conref="../shared/impala_common.xml#common/syntax_blurb"/>
-
-<codeblock>SHOW DATABASES [[LIKE] '<varname>pattern</varname>']
+    <codeblock>SHOW DATABASES [[LIKE] '<varname>pattern</varname>']
 SHOW SCHEMAS [[LIKE] '<varname>pattern</varname>'] - an alias for SHOW DATABASES
 SHOW TABLES [IN <varname>database_name</varname>] [[LIKE] '<varname>pattern</varname>']
 <ph rev="1.2.0">SHOW [AGGREGATE | ANALYTIC] FUNCTIONS [IN <varname>database_name</varname>] [[LIKE] '<varname>pattern</varname>']</ph>
@@ -56,39 +50,47 @@ SHOW FILES IN [<varname>database_name</varname>.]<varname>table_name</varname> <
 
 <ph rev="2.0.0">SHOW ROLES
 SHOW CURRENT ROLES
-SHOW ROLE GRANT GROUP <varname>group_name</varname>
-SHOW GRANT ROLE <varname>role_name</varname></ph>
+SHOW ROLE GRANT GROUP <varname>group_name</varname></ph>
 
-<ph rev="3.1.0">SHOW GRANT USER <varname>user_name</varname>
-SHOW GRANT USER <varname>user_name</varname> ON SERVER
+<ph rev="3.1.0">SHOW GRANT USER <varname>user_name</varname> ON SERVER
 SHOW GRANT USER <varname>user_name</varname> ON DATABASE <varname>database_name</varname>
-SHOW GRANT USER <varname>user_name</varname> ON TABLE <varname>table_name</varname>
+SHOW GRANT USER <varname>user_name</varname> ON TABLE <varname>database_name.table_name</varname>
 SHOW GRANT USER <varname>user_name</varname> ON URI <varname>uri</varname></ph>
-</codeblock>
-
-    <p>
-      Issue a <codeph>SHOW <varname>object_type</varname></codeph> statement to see the appropriate objects in the
-      current database, or <codeph>SHOW <varname>object_type</varname> IN <varname>database_name</varname></codeph>
-      to see objects in a specific database.
-    </p>
-
-    <p>
-      The optional <varname>pattern</varname> argument is a quoted string literal, using Unix-style
-      <codeph>*</codeph> wildcards and allowing <codeph>|</codeph> for alternation. The preceding
-      <codeph>LIKE</codeph> keyword is also optional. All object names are stored in lowercase, so use all
-      lowercase letters in the pattern string. For example:
-    </p>
-
-<codeblock>show databases 'a*';
-show databases like 'a*';
-show tables in some_db like '*fact*';
-use some_db;
-show tables '*dim*|*fact*';</codeblock>
-
+SHOW GRANT USER <varname>user_name</varname> ON COLUMN <varname>database_name.table_name.column_name</varname></codeblock>
+    <p>The following statements are supported only when Impala uses Sentry to
+      manage authorization.</p>
+    <codeblock>SHOW GRANT USER <varname>user_name</varname>
+
+SHOW GRANT ROLE <varname>role_name</varname>
+SHOW GRANT ROLE <varname>role_name</varname> ON SERVER
+SHOW GRANT ROLE <varname>role_name</varname> ON DATABASE <varname>database_name</varname>
+SHOW GRANT ROLE <varname>role_name</varname> ON TABLE <varname>database_name.table_name</varname>
+SHOW GRANT ROLE <varname>role_name</varname> ON URI <varname>uri</varname>
+SHOW GRANT ROLE <varname>role_name</varname> ON COLUMN <varname>database_name.table_name.column_name</varname></codeblock>
+    <p rev="3.3">The following statements are supported only when Impala uses
+      Ranger to manage authorization.</p>
+    <codeblock>SHOW GRANT GROUP <varname>group_name</varname> ON SERVER
+SHOW GRANT GROUP <varname>group_name</varname> ON DATABASE <varname>database_name</varname>
+SHOW GRANT GROUP <varname>group_name</varname> ON TABLE <varname>database_name.table_name</varname>
+SHOW GRANT GROUP <varname>group_name</varname> ON URI <varname>uri</varname>
+SHOW GRANT GROUP <varname>group_name</varname> ON COLUMN <varname>database_name.table_name.column_name</varname></codeblock>
+    <p> Issue a <codeph>SHOW <varname>object_type</varname></codeph> statement
+      to see the appropriate objects in the current database, or <codeph>SHOW
+          <varname>object_type</varname> IN
+        <varname>database_name</varname></codeph> to see objects in a specific
+      database. </p>
+    <p> The optional <varname>pattern</varname> argument is a quoted string
+      literal, using Unix-style <codeph>*</codeph> wildcards and allowing
+        <codeph>|</codeph> for alternation. The preceding <codeph>LIKE</codeph>
+      keyword is also optional. All object names are stored in lowercase, so use
+      all lowercase letters in the pattern string. For example: </p>
+    <codeblock>SHOW DATABASES 'a*';
+SHOW DATABASES LIKE 'a*';
+SHOW TABLES IN some_db LIKE '*fact*';
+USE some_db;
+SHOW TABLES '*dim*|*fact*';</codeblock>
     <p conref="../shared/impala_common.xml#common/cancel_blurb_no"/>
-
     <p outputclass="toc inpage"/>
-
   </conbody>
 
   <concept rev="2.2.0" id="show_files">


[impala] 02/04: IMPALA-8341: Data cache for remote reads

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 2ece4c9b2e114a5e8873c5ac69e75b84c62bf5bd
Author: Michael Ho <kw...@cloudera.com>
AuthorDate: Tue Apr 2 18:36:55 2019 -0700

    IMPALA-8341: Data cache for remote reads
    
    This is a patch based on PhilZ's prototype: https://gerrit.cloudera.org/#/c/12683/
    
    This change implements an IO data cache which is backed by
    local storage. It implicitly relies on the OS page cache
    management to shuffle data between memory and the storage
    device. This is useful for caching data read from remote
    filesystems (e.g. remote HDFS data node, S3, ABFS, ADLS).
    
    A data cache is divided into one or more partitions based on
    the configuration string which is a list of directories, separated
    by comma, followed by the storage capacity per directory.
    An example configuration string is like the following:
      --data_cache_config=/data/0,/data/1:150GB
    
    In the configuration above, the cache may use up to 300GB of
    storage space, with 150GB max for /data/0 and /data/1 respectively.
    
    Each partition has a meta-data cache which tracks the mappings
    of cache keys to the locations of the cached data. A cache key
    is a tuple of (file's name, file's modification time, file offset)
    and a cache entry is a tuple of (backing file, offset in the backing
    file, length of the cached data, optional checksum). Note that the
    cache currently doesn't support overlapping ranges. In other words,
    if the cache contains an entry of a file for range [m, m+4MB), a lookup
    for [m+4K, m+8K) will miss in the cache. In practice, we haven't seen
    this as a problem but this may require further evaluation in the future.
    
    Each partition stores its set of cached data in backing files created
    on local storage. When inserting new data into the cache, the data is
    appended to the current backing file in use. The storage consumption
    of each cache entry counts towards the quota of that partition. When a
    partition reaches its capacity, the least recently used (LRU) data in
    that partition is evicted. Evicted data is removed from the underlying
    storage by punching holes in the backing file it's stored in. As a
    backing file reaches a certain size (by default 4TB), new data will
    stop being appended to it and a new file will be created instead. Note
    that due to hole punching, the backing file is actually sparse. When
    the number of backing files per partition exceeds,
    --data_cache_max_files_per_partition, files are deleted in the order
    in which they are created. Stale cache entries referencing deleted
    files are erased lazily or evicted due to inactivity.
    
    Optionally, checksumming can be enabled to verify read from the cache
    is consistent with what was inserted and to verify that multiple attempted
    insertions with the same cache key have the same cache content.
    Checksumming is enabled by default for debug builds.
    
    To probe for cached data in the cache, the interface Lookup() is used;
    To insert data into the cache, the interface Store() is used. Please note
    that eviction happens inline currently during Store().
    
    This patch also added two startup flags for start-impala-cluster.py:
    '--data_cache_dir' specifies the base directory in which each Impalad
    creates the caching directory
    '--data_cache_size' specifies the capacity string for each cache directory.
    
    Testing done:
    - added a new BE and EE test
    - exhaustive (debug, release) builds with cache enabled
    - core ASAN build with cache enabled
    
    Perf:
    - 16-streams TPCDS at 3TB in a 20 node S3 cluster shows about 30% improvement
    over runs without the cache. Each node has a cache size of 150GB per node.
    The performance is at parity with a configuration of a HDFS cluster using
    EBS as the storage.
    
    Change-Id: I734803c1c1787c858dc3ffa0a2c0e33e77b12edc
    Reviewed-on: http://gerrit.cloudera.org:8080/12987
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/hdfs-scan-node-base.cc                 |  17 +
 be/src/exec/hdfs-scan-node-base.h                  |   7 +
 be/src/runtime/io/CMakeLists.txt                   |   2 +
 be/src/runtime/io/data-cache-test.cc               | 489 ++++++++++++++
 be/src/runtime/io/data-cache.cc                    | 707 +++++++++++++++++++++
 be/src/runtime/io/data-cache.h                     | 354 +++++++++++
 be/src/runtime/io/disk-io-mgr.cc                   |  13 +
 be/src/runtime/io/disk-io-mgr.h                    |  16 +
 be/src/runtime/io/hdfs-file-reader.cc              |  86 ++-
 be/src/runtime/io/hdfs-file-reader.h               |  38 +-
 be/src/runtime/io/request-context.h                |  27 +
 be/src/util/filesystem-util-test.cc                |  36 ++
 be/src/util/filesystem-util.cc                     | 133 +++-
 be/src/util/filesystem-util.h                      |  37 +-
 be/src/util/impalad-metrics.cc                     |  22 +-
 be/src/util/impalad-metrics.h                      |  18 +-
 bin/start-impala-cluster.py                        |  19 +-
 common/thrift/metrics.json                         |  40 ++
 .../queries/QueryTest/data-cache.test              |  49 ++
 tests/common/custom_cluster_test_suite.py          |   4 +-
 tests/common/impala_test_suite.py                  |  24 +
 tests/custom_cluster/test_data_cache.py            |  43 ++
 tests/custom_cluster/test_krpc_metrics.py          |  24 -
 23 files changed, 2150 insertions(+), 55 deletions(-)

diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 15aa4f9..11fd066 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -360,11 +360,28 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   } else {
     num_disks_accessed_counter_ = NULL;
   }
+
+  data_cache_hit_count_ = ADD_COUNTER(runtime_profile(),
+      "DataCacheHitCount", TUnit::UNIT);
+  data_cache_partial_hit_count_ = ADD_COUNTER(runtime_profile(),
+      "DataCachePartialHitCount", TUnit::UNIT);
+  data_cache_miss_count_ = ADD_COUNTER(runtime_profile(),
+      "DataCacheMissCount", TUnit::UNIT);
+  data_cache_hit_bytes_ = ADD_COUNTER(runtime_profile(),
+      "DataCacheHitBytes", TUnit::BYTES);
+  data_cache_miss_bytes_ = ADD_COUNTER(runtime_profile(),
+      "DataCacheMissBytes", TUnit::BYTES);
+
   reader_context_->set_bytes_read_counter(bytes_read_counter());
   reader_context_->set_read_timer(hdfs_read_timer_);
   reader_context_->set_open_file_timer(hdfs_open_file_timer_);
   reader_context_->set_active_read_thread_counter(&active_hdfs_read_thread_counter_);
   reader_context_->set_disks_accessed_bitmap(&disks_accessed_bitmap_);
+  reader_context_->set_data_cache_hit_counter(data_cache_hit_count_);
+  reader_context_->set_data_cache_partial_hit_counter(data_cache_partial_hit_count_);
+  reader_context_->set_data_cache_miss_counter(data_cache_miss_count_);
+  reader_context_->set_data_cache_hit_bytes_counter(data_cache_hit_bytes_);
+  reader_context_->set_data_cache_miss_bytes_counter(data_cache_miss_bytes_);
 
   average_hdfs_read_thread_concurrency_ = runtime_profile()->AddSamplingCounter(
       AVERAGE_HDFS_READ_THREAD_CONCURRENCY, &active_hdfs_read_thread_counter_);
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index a38b3ce..8397d4e 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -539,6 +539,13 @@ class HdfsScanNodeBase : public ScanNode {
   /// Total number of file handle opens where the file handle was not in the cache
   RuntimeProfile::Counter* cached_file_handles_miss_count_ = nullptr;
 
+  /// Counters for data cache.
+  RuntimeProfile::Counter* data_cache_hit_count_ = nullptr;
+  RuntimeProfile::Counter* data_cache_partial_hit_count_ = nullptr;
+  RuntimeProfile::Counter* data_cache_miss_count_ = nullptr;
+  RuntimeProfile::Counter* data_cache_hit_bytes_ = nullptr;
+  RuntimeProfile::Counter* data_cache_miss_bytes_ = nullptr;
+
   /// The amount of time scanner threads spend waiting for I/O.
   RuntimeProfile::Counter* scanner_io_wait_time_ = nullptr;
 
diff --git a/be/src/runtime/io/CMakeLists.txt b/be/src/runtime/io/CMakeLists.txt
index 2215678..29dfe1a 100644
--- a/be/src/runtime/io/CMakeLists.txt
+++ b/be/src/runtime/io/CMakeLists.txt
@@ -22,6 +22,7 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime/io")
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime/io")
 
 add_library(Io
+  data-cache.cc
   disk-io-mgr.cc
   disk-io-mgr-stress.cc
   local-file-system.cc
@@ -40,3 +41,4 @@ add_executable(disk-io-mgr-stress-test disk-io-mgr-stress-test.cc)
 target_link_libraries(disk-io-mgr-stress-test ${IMPALA_TEST_LINK_LIBS})
 
 ADD_BE_LSAN_TEST(disk-io-mgr-test)
+ADD_BE_LSAN_TEST(data-cache-test)
\ No newline at end of file
diff --git a/be/src/runtime/io/data-cache-test.cc b/be/src/runtime/io/data-cache-test.cc
new file mode 100644
index 0000000..cab410f
--- /dev/null
+++ b/be/src/runtime/io/data-cache-test.cc
@@ -0,0 +1,489 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <boost/bind.hpp>
+#include <gflags/gflags.h>
+#include <sys/sysinfo.h>
+
+#include "gutil/strings/join.h"
+#include "runtime/io/data-cache.h"
+#include "runtime/test-env.h"
+#include "service/fe-support.h"
+#include "testutil/gtest-util.h"
+#include "testutil/scoped-flag-setter.h"
+#include "util/counting-barrier.h"
+#include "util/filesystem-util.h"
+#include "util/thread.h"
+
+#include "common/names.h"
+
+#define TEMP_BUFFER_SIZE   (4096)
+#define TEST_BUFFER_SIZE   (8192)
+#define NUM_TEST_DIRS      (16)
+#define NUM_THREADS        (18)
+#define FNAME              ("foobar")
+#define MTIME              (12345)
+#define DEFAULT_CACHE_SIZE (4 * 1024 * 1024)
+
+DECLARE_int64(data_cache_file_max_size_bytes);
+DECLARE_int32(data_cache_max_opened_files);
+DECLARE_int32(data_cache_write_concurrency);
+DECLARE_bool(cache_force_single_shard);
+
+namespace impala {
+namespace io {
+
+class DataCacheTest : public testing::Test {
+ public:
+  const uint8_t* test_buffer() {
+    return reinterpret_cast<const uint8_t*>(test_buffer_);
+  }
+
+  const std::vector<string>& data_cache_dirs() {
+    return data_cache_dirs_;
+  }
+
+  //
+  // Use multiple threads to insert and read back a set of ranges from test_buffer().
+  // Depending on the setting, the working set may or may not fit in the cache.
+  //
+  // 'cache_size'              : cache size in byte
+  // 'max_start_offset'        : maximum offset in test_buffer() to start copying from
+  //                             into the cache for a given entry
+  // 'use_per_thread_filename' : If true, the filename used when inserting into the cache
+  //                             will be prefixed with the thread name, which is unique
+  //                             per thread; If false, the prefix for filenames is empty
+  // 'expect_misses'           : If true, expect there will be cache misses when reading
+  //                             from the cache
+  //
+  void MultiThreadedReadWrite(DataCache* cache, int64_t max_start_offset,
+      bool use_per_thread_filename, bool expect_misses) {
+    // Barrier to synchronize all threads so no thread will start probing the cache until
+    // all insertions are done.
+    CountingBarrier barrier(NUM_THREADS);
+
+    vector<unique_ptr<Thread>> threads;
+    int num_misses[NUM_THREADS];
+    for (int i = 0; i < NUM_THREADS; ++i) {
+      unique_ptr<Thread> thread;
+      num_misses[i] = 0;
+      string thread_name = Substitute("thread-$0", i);
+      ASSERT_OK(Thread::Create("data-cache-test", thread_name,
+          boost::bind(&DataCacheTest::ThreadFn, this,
+             use_per_thread_filename ? thread_name : "", cache, max_start_offset,
+             &barrier, &num_misses[i]), &thread));
+      threads.emplace_back(std::move(thread));
+    }
+    int cache_misses = 0;
+    for (int i = 0; i < NUM_THREADS; ++i) {
+      threads[i]->Join();
+      cache_misses += num_misses[i];
+    }
+    if (expect_misses) {
+      ASSERT_GT(cache_misses, 0);
+    } else {
+      ASSERT_EQ(0, cache_misses);
+    }
+
+    // Verify the backing files don't exceed size limits.
+    ASSERT_OK(cache->CloseFilesAndVerifySizes());
+  }
+
+ protected:
+  DataCacheTest() {
+    // Create a buffer of random characters.
+    for (int i = 0; i < TEST_BUFFER_SIZE; ++i) {
+      test_buffer_[i] = '!' + (rand() % 93);
+    }
+  }
+
+  // Create a bunch of test directories in which the data cache will reside.
+  virtual void SetUp() {
+    test_env_.reset(new TestEnv());
+    flag_saver_.reset(new google::FlagSaver());
+    ASSERT_OK(test_env_->Init());
+    for (int i = 0; i < NUM_TEST_DIRS; ++i) {
+      const string& path = Substitute("/tmp/data-cache-test.$0", i);
+      ASSERT_OK(FileSystemUtil::RemoveAndCreateDirectory(path));
+      data_cache_dirs_.push_back(path);
+    }
+
+    // Force a single shard to avoid imbalance between shards in the Kudu LRU cache which
+    // may lead to unintended eviction. This allows the capacity of the cache is utilized
+    // to the limit for the purpose of this test.
+    FLAGS_cache_force_single_shard = true;
+
+    // Allows full write concurrency for the multi-threaded tests.
+    FLAGS_data_cache_write_concurrency = NUM_THREADS;
+  }
+
+  // Delete all the test directories created.
+  virtual void TearDown() {
+    // Make sure the cache's destructor removes all backing files.
+    for (const string& dir_path : data_cache_dirs_) {
+      vector<string> entries;
+      ASSERT_OK(FileSystemUtil::Directory::GetEntryNames(dir_path, &entries));
+      ASSERT_EQ(0, entries.size());
+    }
+    ASSERT_OK(FileSystemUtil::RemovePaths(data_cache_dirs_));
+    flag_saver_.reset();
+    test_env_.reset();
+  }
+
+ private:
+  std::unique_ptr<TestEnv> test_env_;
+  char test_buffer_[TEST_BUFFER_SIZE];
+  vector<string> data_cache_dirs_;
+
+  // Saved configuration flags for restoring the values at the end of the test.
+  std::unique_ptr<google::FlagSaver> flag_saver_;
+
+  // The function is invoked by each thread in the multi-threaded tests below.
+  // It inserts chunks of size TEMP_BUFFER_SIZE from different offsets at range
+  // [0, 'max_test_offset') from 'test_buffer_'. Afterwards, it tries reading back
+  // all the inserted chunks.
+  //
+  // 'fname_prefix' is the prefix added to the default filename when inserting into
+  // the cache. 'max_test_offset' and 'fname_prefix' implicitly control the combination
+  // of cache keys which indirectly control the cache footprint. 'num_misses' records
+  // the number of cache misses.
+  void ThreadFn(const string& fname_prefix, DataCache* cache, int64_t max_start_offset,
+      CountingBarrier* store_barrier, int* num_misses) {
+    const string& custom_fname = Substitute("$0file", fname_prefix);
+    vector<int64_t> offsets;
+    for (int64_t offset = 0; offset < max_start_offset; ++offset) {
+      offsets.push_back(offset);
+    }
+    random_shuffle(offsets.begin(), offsets.end());
+    for (int64_t offset : offsets) {
+      cache->Store(custom_fname, MTIME, offset, test_buffer() + offset,
+          TEMP_BUFFER_SIZE);
+    }
+    // Wait until all threads have finished inserting. Since different threads may be
+    // inserting the same cache key and collide, only one thread which wins the race will
+    // insert the cache entry. Make sure other threads which lose out on the race will
+    // wait for the insertion to complete first before proceeding.
+    store_barrier->Notify();
+    store_barrier->Wait();
+    for (int64_t offset : offsets) {
+      uint8_t buffer[TEMP_BUFFER_SIZE];
+      memset(buffer, 0, TEMP_BUFFER_SIZE);
+      int64_t bytes_read =
+          cache->Lookup(custom_fname, MTIME, offset, TEMP_BUFFER_SIZE, buffer);
+      if (bytes_read == TEMP_BUFFER_SIZE) {
+        ASSERT_EQ(0, memcmp(buffer, test_buffer() + offset, TEMP_BUFFER_SIZE));
+      } else {
+        ASSERT_EQ(bytes_read, 0);
+        ++(*num_misses);
+      }
+    }
+  }
+};
+
+// This test exercises the basic insertion and lookup paths by inserting a known set of
+// offsets which fit in the cache entirely. Also tries reading entries which are never
+// inserted into the cache.
+TEST_F(DataCacheTest, TestBasics) {
+  const int64_t cache_size = DEFAULT_CACHE_SIZE;
+  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
+  ASSERT_OK(cache.Init());
+
+  // Temporary buffer for holding results read from the cache.
+  uint8_t buffer[TEMP_BUFFER_SIZE];
+  // Read and then insert a range of offsets. Expected all misses in the first iteration
+  // and all hits in the second iteration.
+  for (int i = 0; i < 2; ++i) {
+    for (int64_t offset = 0; offset < 1024; ++offset) {
+      int expected_bytes = i * TEMP_BUFFER_SIZE;
+      memset(buffer, 0, TEMP_BUFFER_SIZE);
+      ASSERT_EQ(expected_bytes,
+          cache.Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE, buffer)) << offset;
+      if (i == 0) {
+        ASSERT_TRUE(cache.Store(FNAME, MTIME, offset, test_buffer() + offset,
+            TEMP_BUFFER_SIZE));
+      } else {
+        ASSERT_EQ(0, memcmp(test_buffer() + offset, buffer, TEMP_BUFFER_SIZE));
+      }
+    }
+  }
+
+  // Read the same range inserted previously but with a different filename.
+  for (int64_t offset = 1024; offset < TEST_BUFFER_SIZE; ++offset) {
+    const string& alt_fname = "random";
+    ASSERT_EQ(0, cache.Lookup(alt_fname, MTIME, offset, TEMP_BUFFER_SIZE, buffer));
+  }
+
+  // Read the same range inserted previously but with a different mtime.
+  for (int64_t offset = 1024; offset < TEST_BUFFER_SIZE; ++offset) {
+    int64_t alt_mtime = 67890;
+    ASSERT_EQ(0, cache.Lookup(FNAME, alt_mtime, offset, TEMP_BUFFER_SIZE, buffer));
+  }
+
+  // Read a range of offsets which should miss in the cache.
+  for (int64_t offset = 1024; offset < TEST_BUFFER_SIZE; ++offset) {
+    ASSERT_EQ(0, cache.Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE, buffer));
+  }
+
+  // Read the same same range inserted previously. They should still all be in the cache.
+  for (int64_t offset = 0; offset < 1024; ++offset) {
+    memset(buffer, 0, TEMP_BUFFER_SIZE);
+    ASSERT_EQ(TEMP_BUFFER_SIZE,
+        cache.Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE + 10, buffer));
+    ASSERT_EQ(0, memcmp(test_buffer() + offset, buffer, TEMP_BUFFER_SIZE));
+    ASSERT_EQ(TEMP_BUFFER_SIZE - 10,
+        cache.Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE - 10, buffer));
+    ASSERT_EQ(0, memcmp(test_buffer() + offset, buffer, TEMP_BUFFER_SIZE - 10));
+  }
+
+  // Insert with the same key but different length.
+  uint8_t buffer2[TEMP_BUFFER_SIZE + 10];
+  const int64_t larger_entry_size = TEMP_BUFFER_SIZE + 10;
+  ASSERT_TRUE(cache.Store(FNAME, MTIME, 0, test_buffer(), larger_entry_size));
+  memset(buffer2, 0, larger_entry_size);
+  ASSERT_EQ(larger_entry_size,
+      cache.Lookup(FNAME, MTIME, 0, larger_entry_size, buffer2));
+  ASSERT_EQ(0, memcmp(test_buffer(), buffer2, larger_entry_size));
+
+  // Check that an insertion larger than the cache size will fail.
+  ASSERT_FALSE(cache.Store(FNAME, MTIME, 0, test_buffer(), cache_size * 2));
+}
+
+// Tests backing file rotation by setting FLAGS_data_cache_file_max_size_bytes to be 1/4
+// of the cache size. This forces rotation of backing files.
+TEST_F(DataCacheTest, RotateFiles) {
+  // Set the maximum size of backing files to be 1/4 of the cache size.
+  FLAGS_data_cache_file_max_size_bytes = 1024 * 1024;
+  const int64_t cache_size = 4 * FLAGS_data_cache_file_max_size_bytes ;
+  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
+  ASSERT_OK(cache.Init());
+
+  // Read and then insert a range of offsets. Expected all misses in the first iteration
+  // and all hits in the second iteration.
+  for (int i = 0; i < 2; ++i) {
+    for (int64_t offset = 0; offset < 1024; ++offset) {
+      int expected_bytes = i * TEMP_BUFFER_SIZE;
+      uint8_t buffer[TEMP_BUFFER_SIZE];
+      memset(buffer, 0, TEMP_BUFFER_SIZE);
+      ASSERT_EQ(expected_bytes,
+          cache.Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE, buffer)) << offset;
+      if (i == 0) {
+        ASSERT_TRUE(cache.Store(FNAME, MTIME, offset, test_buffer() + offset,
+            TEMP_BUFFER_SIZE));
+      } else {
+        ASSERT_EQ(0, memcmp(test_buffer() + offset, buffer, TEMP_BUFFER_SIZE));
+      }
+    }
+  }
+
+  // Make sure the cache's destructor removes all backing files.
+  vector<string> entries;
+  ASSERT_OK(FileSystemUtil::Directory::GetEntryNames(data_cache_dirs()[0], &entries, -1,
+      FileSystemUtil::Directory::EntryType::DIR_ENTRY_REG));
+  ASSERT_EQ(4, entries.size());
+
+  // Verify the backing files don't exceed size limits.
+  ASSERT_OK(cache.CloseFilesAndVerifySizes());
+}
+
+// Tests backing file rotation by setting --data_cache_file_max_size_bytes to be 1/4
+// of the cache size. This forces rotation of backing files. Also sets
+// --data_cache_max_opened_files to 1 so that only one underlying
+// file is allowed. This exercises the lazy deletion path.
+TEST_F(DataCacheTest, RotateAndDeleteFiles) {
+  // Set the maximum size of backing files to be 1/4 of the cache size.
+  FLAGS_data_cache_file_max_size_bytes = 1024 * 1024;
+  // Force to allow one backing file.
+  FLAGS_data_cache_max_opened_files = 1;
+
+  const int64_t cache_size = 4 * FLAGS_data_cache_file_max_size_bytes ;
+  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
+  ASSERT_OK(cache.Init());
+
+  // Read and insert a working set the same size of the cache. Expected all misses
+  // in all iterations as the backing file is only 1/4 of the cache capacity.
+  uint8_t buffer[TEMP_BUFFER_SIZE];
+  for (int i = 0; i < 4; ++i) {
+    for (int64_t offset = 0; offset < 1024; ++offset) {
+      memset(buffer, 0, TEMP_BUFFER_SIZE);
+      ASSERT_EQ(0, cache.Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE, buffer));
+      ASSERT_TRUE(cache.Store(FNAME, MTIME, offset, test_buffer() + offset,
+          TEMP_BUFFER_SIZE));
+    }
+  }
+
+  // Verifies that the last part of the working set which fits in a single backing file
+  // is still in the cache.
+  int64_t in_cache_offset =
+      1024 - FLAGS_data_cache_file_max_size_bytes / TEMP_BUFFER_SIZE;
+  for (int64_t offset = in_cache_offset ; offset < 1024; ++offset) {
+    memset(buffer, 0, TEMP_BUFFER_SIZE);
+    ASSERT_EQ(TEMP_BUFFER_SIZE, cache.Lookup(FNAME, MTIME, offset,
+        TEMP_BUFFER_SIZE, buffer));
+    ASSERT_EQ(0, memcmp(buffer, test_buffer() + offset, TEMP_BUFFER_SIZE));
+  }
+
+  // Make sure only one backing file exists. Allow for 10 seconds latency for the
+  // file deleter thread to run.
+  int num_entries = 0;
+  const int num_wait_secs = 10;
+  for (int i = 0; i <= num_wait_secs; ++i) {
+    vector<string> entries;
+    ASSERT_OK(FileSystemUtil::Directory::GetEntryNames(data_cache_dirs()[0], &entries, -1,
+        FileSystemUtil::Directory::EntryType::DIR_ENTRY_REG));
+    num_entries = entries.size();
+    if (num_entries == 1) break;
+    // Flag a failure on the 11-th time around this loop.
+    ASSERT_LT(i, num_wait_secs);
+    sleep(1);
+  }
+}
+
+// Tests eviction in the cache by inserting a large entry which evicts all existing
+// entries in the cache.
+TEST_F(DataCacheTest, Eviction) {
+  const int64_t cache_size = DEFAULT_CACHE_SIZE;
+  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
+  ASSERT_OK(cache.Init());
+
+  // Read and then insert range chunks of size TEMP_BUFFER_SIZE from the test buffer.
+  // Expected all misses in both iterations due to LRU eviction.
+  uint8_t buffer[TEMP_BUFFER_SIZE];
+  int64_t offset = 0;
+  for (int i = 0; i < 2; ++i) {
+    for (offset = 0; offset < 1028; ++offset) {
+      ASSERT_EQ(0, cache.Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE, buffer));
+      ASSERT_TRUE(cache.Store(FNAME, MTIME, offset, test_buffer() + offset,
+          TEMP_BUFFER_SIZE));
+    }
+  }
+  // Verifies that the cache is full.
+  int hit_count = 0;
+  for (offset = 0; offset < 1028; ++offset) {
+    int64_t bytes_read = cache.Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE, buffer);
+    DCHECK(bytes_read == 0 || bytes_read == TEMP_BUFFER_SIZE);
+    if (bytes_read == TEMP_BUFFER_SIZE) ++hit_count;
+  }
+  ASSERT_EQ(1024, hit_count);
+
+  // Create a buffer which has the same size as the cache and insert it into the cache.
+  // This should evict all the existing entries in the cache.
+  unique_ptr<uint8_t[]> large_buffer(new uint8_t[DEFAULT_CACHE_SIZE]);
+  for (offset = 0; offset < cache_size; offset += TEST_BUFFER_SIZE) {
+    memcpy(large_buffer.get() + offset, test_buffer(), TEST_BUFFER_SIZE);
+  }
+  const string& alt_fname = "random";
+  offset = 0;
+  ASSERT_TRUE(cache.Store(alt_fname, MTIME, offset, large_buffer.get(), cache_size));
+
+  // Verifies that all previous entries are all evicted.
+  for (offset = 0; offset < 1028; ++offset) {
+    ASSERT_EQ(0, cache.Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE, buffer));
+  }
+  // The large buffer should still be in the cache.
+  unique_ptr<uint8_t[]> temp_buffer(new uint8_t[DEFAULT_CACHE_SIZE]);
+  offset = 0;
+  ASSERT_EQ(cache_size,
+      cache.Lookup(alt_fname, MTIME, offset, cache_size, temp_buffer.get()));
+  ASSERT_EQ(0, memcmp(temp_buffer.get(), large_buffer.get(), cache_size));
+
+  // Verify the backing files don't exceed size limits.
+  ASSERT_OK(cache.CloseFilesAndVerifySizes());
+}
+
+// Tests insertion and lookup with the cache with multiple threads.
+// Inserts a working set which will fit in the cache. Despite potential
+// collision during insertion, all entries in the working set should be found.
+TEST_F(DataCacheTest, MultiThreadedNoMisses) {
+  int64_t cache_size = DEFAULT_CACHE_SIZE;
+  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
+  ASSERT_OK(cache.Init());
+
+  int64_t max_start_offset = 1024;
+  bool use_per_thread_filename = false;
+  bool expect_misses = false;
+  MultiThreadedReadWrite(&cache, max_start_offset, use_per_thread_filename,
+      expect_misses);
+}
+
+// Inserts a working set which is known to be larger than the cache's capacity.
+// Expect some cache misses in lookups.
+TEST_F(DataCacheTest, MultiThreadedWithMisses) {
+  int64_t cache_size = DEFAULT_CACHE_SIZE;
+  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
+  ASSERT_OK(cache.Init());
+
+  int64_t max_start_offset = 1024;
+  bool use_per_thread_filename = true;
+  bool expect_misses = true;
+  MultiThreadedReadWrite(&cache, max_start_offset, use_per_thread_filename,
+      expect_misses);
+}
+
+// Test insertion and lookup with a cache configured with multiple partitions.
+TEST_F(DataCacheTest, MultiPartitions) {
+  StringPiece delimiter(",");
+  string cache_base = JoinStrings(data_cache_dirs(), delimiter);
+  const int64_t cache_size = DEFAULT_CACHE_SIZE;
+  DataCache cache(Substitute("$0:$1", cache_base, std::to_string(cache_size)));
+  ASSERT_OK(cache.Init());
+
+  int64_t max_start_offset = 512;
+  bool use_per_thread_filename = false;
+  bool expect_misses = false;
+  MultiThreadedReadWrite(&cache, max_start_offset, use_per_thread_filename,
+      expect_misses);
+}
+
+// Tests insertion of a working set whose size is 1/8 of the total memory size.
+// This likely exceeds the size of the page cache and forces write back of dirty pages in
+// the page cache to the backing files and also read from the backing files during lookup.
+TEST_F(DataCacheTest, LargeFootprint) {
+  struct sysinfo info;
+  ASSERT_EQ(0, sysinfo(&info));
+  ASSERT_GT(info.totalram, 0);
+  DataCache cache(
+      Substitute("$0:$1", data_cache_dirs()[0], std::to_string(info.totalram)));
+  ASSERT_OK(cache.Init());
+
+  const int64_t footprint = info.totalram / 8;
+  for (int64_t i = 0; i < footprint / TEST_BUFFER_SIZE; ++i) {
+    int64_t offset = i * TEST_BUFFER_SIZE;
+    ASSERT_TRUE(cache.Store(FNAME, MTIME, offset, test_buffer(), TEST_BUFFER_SIZE));
+  }
+  uint8_t buffer[TEST_BUFFER_SIZE];
+  for (int64_t i = 0; i < footprint / TEST_BUFFER_SIZE; ++i) {
+    int64_t offset = i * TEST_BUFFER_SIZE;
+    memset(buffer, 0, TEST_BUFFER_SIZE);
+    ASSERT_EQ(TEST_BUFFER_SIZE,
+        cache.Lookup(FNAME, MTIME, offset, TEST_BUFFER_SIZE, buffer));
+    ASSERT_EQ(0, memcmp(buffer, test_buffer(), TEST_BUFFER_SIZE));
+  }
+}
+
+} // namespace io
+} // namespace impala
+
+int main(int argc, char **argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
+  int rand_seed = time(NULL);
+  LOG(INFO) << "rand_seed: " << rand_seed;
+  srand(rand_seed);
+  return RUN_ALL_TESTS();
+}
diff --git a/be/src/runtime/io/data-cache.cc b/be/src/runtime/io/data-cache.cc
new file mode 100644
index 0000000..11f13b6
--- /dev/null
+++ b/be/src/runtime/io/data-cache.cc
@@ -0,0 +1,707 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/io/data-cache.h"
+
+#include <boost/algorithm/string.hpp>
+#include <errno.h>
+#include <fcntl.h>
+#include <mutex>
+#include <string.h>
+#include <unistd.h>
+
+#include "exec/kudu-util.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/env.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/path_util.h"
+#include "gutil/strings/split.h"
+#include "util/bit-util.h"
+#include "util/error-util.h"
+#include "util/filesystem-util.h"
+#include "util/hash-util.h"
+#include "util/impalad-metrics.h"
+#include "util/parse-util.h"
+#include "util/pretty-printer.h"
+#include "util/scope-exit-trigger.h"
+#include "util/uid-util.h"
+
+#ifndef FALLOC_FL_PUNCH_HOLE
+#include <linux/falloc.h>
+#endif
+
+#include "common/names.h"
+
+using kudu::faststring;
+using kudu::JoinPathSegments;
+using kudu::percpu_rwlock;
+using kudu::RWFile;
+using kudu::rw_spinlock;
+using kudu::Slice;
+using strings::SkipEmpty;
+using strings::Split;
+
+#ifdef NDEBUG
+#define ENABLE_CHECKSUMMING (false)
+#else
+#define ENABLE_CHECKSUMMING (true)
+#endif
+
+DEFINE_int64(data_cache_file_max_size_bytes, 1L << 40 /* 1TB */,
+    "(Advanced) The maximum size which a cache file can grow to before data stops being "
+    "appended to it.");
+DEFINE_int32(data_cache_max_opened_files, 1000,
+    "(Advanced) The maximum number of allowed opened files. This must be at least the "
+    "number of specified partitions.");
+DEFINE_int32(data_cache_write_concurrency, 1,
+    "(Advanced) Number of concurrent threads allowed to insert into the cache per "
+    "partition.");
+DEFINE_bool(data_cache_checksum, ENABLE_CHECKSUMMING,
+    "(Advanced) Enable checksumming for the cached buffer.");
+
+namespace impala {
+namespace io {
+
+static const int64_t PAGE_SIZE = 1L << 12;
+const char* DataCache::Partition::CACHE_FILE_PREFIX = "impala-cache-file-";
+const int MAX_FILE_DELETER_QUEUE_SIZE = 500;
+
+/// This class is an implementation of backing files in a cache partition.
+///
+/// A partition uses the interface Create() to create a backing file. A reader can read
+/// from the backing file using the interface Read().
+///
+/// The backing file is append-only. To insert new data into the file, Allocate() is
+/// called to reserve a contiguous area in the backing file. If the reservation succeeds,
+/// the insertion offset is returned. Write() is called to add the data at the insertion
+/// offset in the backing file. Allocations in the file may be evicted by punching hole
+/// (via PunchHole()) in the backing file. The data in the hole area is reclaimed by the
+/// underlying filesystem.
+///
+/// To avoid having too many backing files opened, old files are deleted to keep the
+/// number of opened files within --data_cache_max_opened_files. Files are deleted
+/// asynchronously by the file deleter thread pool. To synchronize between file deletion
+/// and concurrent accesses of the file via Read()/Write()/PunchHole(), reader lock
+/// is held in those functions. Before a file is deleted, Close() must be called by
+/// the deleter thread, which holds the writer lock to block off all readers and sets
+/// 'file_' to NULL. Read()/Write()/PunchHole() will check whether 'file_' is NULL.
+/// If the file is already closed, the function will fail. On a failure of Read(), the
+/// caller is expected to delete the stale cache entry. On a failure of Write(), the
+/// caller is not expected to insert the cache entry. In other words, any stale cache
+/// entry which references a deleted file will either be lazily erased on Read() or
+/// evicted due to inactivity.
+///
+class DataCache::CacheFile {
+ public:
+  ~CacheFile() {
+    // Close file if it's not closed already.
+    DeleteFile();
+  }
+
+  static Status Create(std::string path, std::unique_ptr<CacheFile>* cache_file_ptr) {
+    unique_ptr<CacheFile> cache_file(new CacheFile(path));
+    KUDU_RETURN_IF_ERROR(kudu::Env::Default()->NewRWFile(path, &cache_file->file_),
+        "Failed to create cache file");
+    *cache_file_ptr = std::move(cache_file);
+    return Status::OK();
+  }
+
+  // Close the underlying file so it cannot be read or written to anymore.
+  void Close() {
+    // Explicitly hold the lock in write mode to block all readers. This ensures that
+    // setting 'file_' to NULL and 'allow_append_' to false below is atomic.
+    std::unique_lock<percpu_rwlock> lock(lock_);
+    // If the file is already closed, nothing to do.
+    if (!file_) return;
+    kudu::Status status = file_->Close();
+    if (!status.ok()) {
+      LOG(WARNING) << Substitute("Failed to close cache file $0: $1", path_,
+          status.ToString());
+    }
+    file_.reset();
+    allow_append_ = false;
+  }
+
+  // Close the underlying file and delete it from the filesystem.
+  void DeleteFile() {
+    Close();
+    DCHECK(!file_);
+    kudu::Status status = kudu::Env::Default()->DeleteFile(path_);
+    if (!status.ok()) {
+      LOG(WARNING) << Substitute("Failed to unlink $0: $1", path_, status.ToString());
+    }
+  }
+
+  // Allocates a chunk of 'len' bytes in this file. The cache partition's lock
+  // 'partition_lock' must be held when calling this function. Returns the byte offset
+  // into the file for insertion. 'len' is expected to be multiples of PAGE_SIZE.
+  // Returns -1 if the file doesn't have enough space for insertion.
+  int64_t Allocate(int64_t len, const std::unique_lock<SpinLock>& partition_lock) {
+    DCHECK(partition_lock.owns_lock());
+    DCHECK_EQ(len % PAGE_SIZE, 0);
+    DCHECK_EQ(current_offset_ % PAGE_SIZE, 0);
+    // Hold the lock in shared mode to check if 'file_' is not closed already.
+    kudu::shared_lock<rw_spinlock> lock(lock_.get_lock());
+    if (!allow_append_ || (current_offset_ + len > FLAGS_data_cache_file_max_size_bytes &&
+            current_offset_ > 0)) {
+      allow_append_ = false;
+      return -1;
+    }
+    DCHECK(file_);
+    int64_t insertion_offset = current_offset_;
+    current_offset_ += len;
+    return insertion_offset;
+  }
+
+  // Reads from byte offset 'offset' for 'bytes_to_read' bytes into 'buffer'.
+  // Returns true iff read succeeded. Returns false on error or if the file
+  // is already closed.
+  bool Read(int64_t offset, uint8_t* buffer, int64_t bytes_to_read) {
+    DCHECK_EQ(offset % PAGE_SIZE, 0);
+    // Hold the lock in shared mode to check if 'file_' is not closed already.
+    kudu::shared_lock<rw_spinlock> lock(lock_.get_lock());
+    if (UNLIKELY(!file_)) return false;
+    DCHECK_LE(offset + bytes_to_read, current_offset_);
+    kudu::Status status = file_->Read(offset, Slice(buffer, bytes_to_read));
+    if (UNLIKELY(!status.ok())) {
+      LOG(ERROR) << Substitute("Failed to read from $0 at offset $1 for $2 bytes: $3",
+          path_, offset, PrettyPrinter::PrintBytes(bytes_to_read), status.ToString());
+      return false;
+    }
+    return true;
+  }
+
+  // Writes 'buffer' of length 'buffer_len' into  byte offset 'offset' in the file.
+  // Returns true iff write succeeded. Returns false on errors or if the file is
+  // already closed.
+  bool Write(int64_t offset, const uint8_t* buffer, int64_t buffer_len) {
+    DCHECK_EQ(offset % PAGE_SIZE, 0);
+    DCHECK_LE(offset, current_offset_);
+    // Hold the lock in shared mode to check if 'file_' is not closed already.
+    kudu::shared_lock<rw_spinlock> lock(lock_.get_lock());
+    if (UNLIKELY(!file_)) return false;
+    DCHECK_LE(offset + buffer_len, current_offset_);
+    kudu::Status status = file_->Write(offset, Slice(buffer, buffer_len));
+    if (UNLIKELY(!status.ok())) {
+      LOG(ERROR) << Substitute("Failed to write to $0 at offset $1 for $2 bytes: $3",
+          path_, offset, PrettyPrinter::PrintBytes(buffer_len), status.ToString());
+      return false;
+    }
+    return true;
+  }
+
+  void PunchHole(int64_t offset, int64_t hole_size) {
+    DCHECK_EQ(offset % PAGE_SIZE, 0);
+    DCHECK_EQ(hole_size % PAGE_SIZE, 0);
+    // Hold the lock in shared mode to check if 'file_' is not closed already.
+    kudu::shared_lock<rw_spinlock> lock(lock_.get_lock());
+    if (UNLIKELY(!file_)) return;
+    DCHECK_LE(offset + hole_size, current_offset_);
+    kudu::Status status = file_->PunchHole(offset, hole_size);
+    if (UNLIKELY(!status.ok())) {
+      LOG(DFATAL) << Substitute("Failed to punch hole in $0 at offset $1 for $2 $3",
+          path_, offset, PrettyPrinter::PrintBytes(hole_size), status.ToString());
+    }
+  }
+
+  const string& path() const { return path_; }
+
+ private:
+  /// Full path of the backing file in the local storage.
+  const string path_;
+
+  /// The underlying backing file. NULL if the file has been closed.
+  unique_ptr<RWFile> file_;
+
+  /// True iff it's okay to append to this backing file.
+  bool allow_append_ = true;
+
+  /// The current offset in the file to append to on next insert.
+  int64_t current_offset_ = 0;
+
+  /// This is a reader-writer lock used for synchronization with the deleter thread.
+  /// It is taken in write mode in Close() and shared mode everywhere else. It's expected
+  /// that all places except for Close() check that 'file_' is not NULL with the lock held
+  /// in shared mode while Close() ensures that no thread is holding the lock in shared
+  /// mode so it's safe to close the file. The file can no longer be read, written or hole
+  /// punched after it has been closed. The only operation allowed is to deletion.
+  percpu_rwlock lock_;
+
+  /// C'tor of CacheFile to be called by Create() only.
+  explicit CacheFile(std::string path) : path_(move(path)) { }
+
+  DISALLOW_COPY_AND_ASSIGN(CacheFile);
+};
+
+/// An entry in the metadata cache in a partition.
+/// Contains the whereabouts of the cached content.
+class DataCache::CacheEntry {
+ public:
+  explicit CacheEntry(CacheFile* file, int64_t offset, int64_t len, uint64_t checksum)
+    : file_(file), offset_(offset), len_(len), checksum_(checksum) {
+  }
+
+  // Unpack a cache's entry represented by 'slice'. This is done in place of casting
+  // to avoid any potential alignment issue.
+  explicit CacheEntry(const Slice& value) {
+    DCHECK_EQ(value.size(), sizeof(CacheEntry));
+    memcpy(this, value.data(), value.size());
+  }
+
+  CacheFile* file() const { return file_; }
+  int64_t offset() const { return offset_; }
+  int64_t len() const { return len_; }
+  uint64_t checksum() const { return checksum_; }
+
+ private:
+  /// The backing file holding the cached content.
+  CacheFile* const file_ = nullptr;
+
+  /// The starting byte offset in the backing file at which the content is stored.
+  const int64_t offset_ = 0;
+
+  /// The length in bytes of the cached content.
+  const int64_t len_ = 0;
+
+  /// Optional checksum of the content computed when inserting the cache entry.
+  const uint64_t checksum_ = 0;
+};
+
+/// The key used for look up in the cache.
+struct DataCache::CacheKey {
+ public:
+  explicit CacheKey(const string& filename, int64_t mtime, int64_t offset)
+    : key_(filename.size() + sizeof(mtime) + sizeof(offset)) {
+    key_.append(filename);
+    key_.append(&mtime, sizeof(mtime));
+    key_.append(&offset, sizeof(offset));
+  }
+
+  int64_t Hash() const {
+    return HashUtil::FastHash64(key_.data(), key_.size(), 0);
+  }
+
+  Slice ToSlice() const {
+    return key_;
+  }
+
+ private:
+  faststring key_;
+};
+
+DataCache::Partition::Partition(const string& path, int64_t capacity,
+    int max_opened_files)
+  : path_(path), capacity_(max<int64_t>(capacity, PAGE_SIZE)),
+    max_opened_files_(max_opened_files),
+    meta_cache_(NewLRUCache(kudu::DRAM_CACHE, capacity_, path_)) {
+}
+
+DataCache::Partition::~Partition() {
+  if (!closed_) ReleaseResources();
+}
+
+Status DataCache::Partition::CreateCacheFile() {
+  lock_.DCheckLocked();
+  const string& path =
+      JoinPathSegments(path_, CACHE_FILE_PREFIX + PrintId(GenerateUUID()));
+  unique_ptr<CacheFile> cache_file;
+  RETURN_IF_ERROR(CacheFile::Create(path, &cache_file));
+  cache_files_.emplace_back(std::move(cache_file));
+  LOG(INFO) << "Created cache file " << path;
+  return Status::OK();
+}
+
+Status DataCache::Partition::DeleteExistingFiles() const {
+  vector<string> entries;
+  RETURN_IF_ERROR(FileSystemUtil::Directory::GetEntryNames(path_, &entries, 0,
+      FileSystemUtil::Directory::EntryType::DIR_ENTRY_REG));
+  for (const string& entry : entries) {
+    if (entry.find(CACHE_FILE_PREFIX) == 0) {
+      const string file_path = JoinPathSegments(path_, entry);
+      KUDU_RETURN_IF_ERROR(kudu::Env::Default()->DeleteFile(file_path),
+          Substitute("Failed to delete old cache file $0", file_path));
+      LOG(INFO) << Substitute("Deleted old cache file $0", file_path);
+    }
+  }
+  return Status::OK();
+}
+
+Status DataCache::Partition::Init() {
+  std::unique_lock<SpinLock> partition_lock(lock_);
+
+  // Verify the validity of the path specified.
+  if (!FileSystemUtil::IsCanonicalPath(path_)) {
+    return Status(Substitute("$0 is not a canonical path", path_));
+  }
+  RETURN_IF_ERROR(FileSystemUtil::VerifyIsDirectory(path_));
+
+  // Delete all existing backing files left over from previous runs.
+  RETURN_IF_ERROR(DeleteExistingFiles());
+
+  // Check if there is enough space available at this point in time.
+  uint64_t available_bytes;
+  RETURN_IF_ERROR(FileSystemUtil::GetSpaceAvailable(path_, &available_bytes));
+  if (available_bytes < capacity_) {
+    const string& err = Substitute("Insufficient space for $0. Required $1. Only $2 is "
+        "available", path_, PrettyPrinter::PrintBytes(capacity_),
+        PrettyPrinter::PrintBytes(available_bytes));
+    LOG(ERROR) << err;
+    return Status(err);
+  }
+
+  // Make sure hole punching is supported for the caching directory.
+  RETURN_IF_ERROR(FileSystemUtil::CheckHolePunch(path_));
+
+  // Create a backing file for the partition.
+  RETURN_IF_ERROR(CreateCacheFile());
+  oldest_opened_file_ = 0;
+  return Status::OK();
+}
+
+Status DataCache::Partition::CloseFilesAndVerifySizes() {
+  int64_t total_size = 0;
+  for (auto& file : cache_files_) {
+    uint64_t sz_on_disk;
+    // Close the backing files before checking sizes as some filesystems (e.g. XFS)
+    // preallocate the file beyond EOF. Closing the file removes any preallocation.
+    file->Close();
+    kudu::Env* env = kudu::Env::Default();
+    KUDU_RETURN_IF_ERROR(env->GetFileSizeOnDisk(file->path(), &sz_on_disk),
+        "CloseFilesAndVerifySizes()");
+    total_size += sz_on_disk;
+    uint64_t logical_sz;
+    KUDU_RETURN_IF_ERROR(env->GetFileSize(file->path(), &logical_sz),
+        "CloseFilesAndVerifySizes()");
+    DCHECK_LE(logical_sz, FLAGS_data_cache_file_max_size_bytes);
+  }
+  if (total_size > capacity_) {
+    return Status(Substitute("Partition $0 consumed $1 bytes, exceeding capacity of $2 "
+        "bytes", path_, total_size, capacity_));
+  }
+  return Status::OK();
+}
+
+void DataCache::Partition::ReleaseResources() {
+  std::unique_lock<SpinLock> partition_lock(lock_);
+  if (closed_) return;
+  closed_ = true;
+  // Close and delete all backing files in this partition.
+  cache_files_.clear();
+  // Free all memory consumed by the metadata cache.
+  meta_cache_.reset();
+}
+
+int64_t DataCache::Partition::Lookup(const CacheKey& cache_key, int64_t bytes_to_read,
+    uint8_t* buffer) {
+  DCHECK(!closed_);
+  Slice key = cache_key.ToSlice();
+  kudu::Cache::Handle* handle =
+      meta_cache_->Lookup(key, kudu::Cache::EXPECT_IN_CACHE);
+  if (handle == nullptr) return 0;
+  auto handle_release =
+      MakeScopeExitTrigger([this, &handle]() { meta_cache_->Release(handle); });
+
+  // Read from the backing file.
+  CacheEntry entry(meta_cache_->Value(handle));
+  CacheFile* cache_file = entry.file();
+  bytes_to_read = min(entry.len(), bytes_to_read);
+  VLOG(3) << Substitute("Reading file $0 offset $1 len $2 checksum $3 bytes_to_read $4",
+      cache_file->path(), entry.offset(), entry.len(), entry.checksum(), bytes_to_read);
+  if (UNLIKELY(!cache_file->Read(entry.offset(), buffer, bytes_to_read))) {
+    meta_cache_->Erase(key);
+    return 0;
+  }
+
+  // Verify checksum if enabled. Delete entry on checksum mismatch.
+  if (FLAGS_data_cache_checksum && bytes_to_read == entry.len() &&
+      !VerifyChecksum("read", entry, buffer, bytes_to_read)) {
+    meta_cache_->Erase(key);
+    return 0;
+  }
+  return bytes_to_read;
+}
+
+bool DataCache::Partition::HandleExistingEntry(const Slice& key,
+    kudu::Cache::Handle* handle, const uint8_t* buffer, int64_t buffer_len) {
+  // Unpack the cache entry.
+  CacheEntry entry(meta_cache_->Value(handle));
+
+  // Try verifying the checksum of the new buffer matches that of the existing entry.
+  // On checksum mismatch, delete the existing entry and don't install the new entry
+  // as it's unclear which one is right.
+  if (FLAGS_data_cache_checksum && buffer_len >= entry.len()) {
+    if (!VerifyChecksum("write", entry, buffer, buffer_len)) {
+      meta_cache_->Erase(key);
+      return true;
+    }
+  }
+  // If the new entry is not any longer than the existing entry, no work to do.
+  return entry.len() >= buffer_len;
+}
+
+bool DataCache::Partition::InsertIntoCache(const Slice& key, CacheFile* cache_file,
+    int64_t insertion_offset, const uint8_t* buffer, int64_t buffer_len) {
+  DCHECK_EQ(insertion_offset % PAGE_SIZE, 0);
+  const int64_t charge_len = BitUtil::RoundUp(buffer_len, PAGE_SIZE);
+
+  // Allocate a cache handle
+  kudu::Cache::PendingHandle* pending_handle =
+      meta_cache_->Allocate(key, sizeof(CacheEntry), charge_len);
+  if (UNLIKELY(pending_handle == nullptr)) return false;
+  auto release_pending_handle = MakeScopeExitTrigger([this, &pending_handle]() {
+    if (pending_handle != nullptr) meta_cache_->Free(pending_handle);
+  });
+
+  // Compute checksum if necessary.
+  int64_t checksum = FLAGS_data_cache_checksum ? Checksum(buffer, buffer_len) : 0;
+
+  // Write to backing file.
+  VLOG(3) << Substitute("Storing file $0 offset $1 len $2 checksum $3 ",
+      cache_file->path(), insertion_offset, buffer_len, checksum);
+  if (UNLIKELY(!cache_file->Write(insertion_offset, buffer, buffer_len))) {
+    return false;
+  }
+
+  // Insert the new entry into the cache.
+  CacheEntry entry(cache_file, insertion_offset, buffer_len, checksum);
+  memcpy(meta_cache_->MutableValue(pending_handle), &entry, sizeof(CacheEntry));
+  kudu::Cache::Handle* handle = meta_cache_->Insert(pending_handle, this);
+  meta_cache_->Release(handle);
+  pending_handle = nullptr;
+  ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_TOTAL_BYTES->Increment(charge_len);
+  return true;
+}
+
+bool DataCache::Partition::Store(const CacheKey& cache_key, const uint8_t* buffer,
+    int64_t buffer_len, bool* start_reclaim) {
+  DCHECK(!closed_);
+  *start_reclaim = false;
+  Slice key = cache_key.ToSlice();
+  const int64_t charge_len = BitUtil::RoundUp(buffer_len, PAGE_SIZE);
+  if (charge_len > capacity_) return false;
+
+  // Check for existing entry.
+  kudu::Cache::Handle* handle = meta_cache_->Lookup(key, kudu::Cache::EXPECT_IN_CACHE);
+  if (handle != nullptr) {
+    auto handle_release =
+        MakeScopeExitTrigger([this, &handle]() { meta_cache_->Release(handle); });
+    if (HandleExistingEntry(key, handle, buffer, buffer_len)) return false;
+  }
+
+  CacheFile* cache_file;
+  int64_t insertion_offset;
+  {
+    std::unique_lock<SpinLock> partition_lock(lock_);
+
+    // Limit the write concurrency to avoid blocking the caller (which could be calling
+    // from the critical path of an IO read) when the cache becomes IO bound due to either
+    // limited memory for page cache or the cache is undersized which leads to eviction.
+    //
+    // TODO: defer the writes to another thread which writes asynchronously. Need to bound
+    // the extra memory consumption for holding the temporary buffer though.
+    const bool exceed_concurrency =
+        pending_insert_set_.size() >= FLAGS_data_cache_write_concurrency;
+    if (exceed_concurrency ||
+        pending_insert_set_.find(key.ToString()) != pending_insert_set_.end()) {
+      ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_DROPPED_BYTES->Increment(buffer_len);
+      return false;
+    }
+
+    // Allocate from the backing file.
+    CHECK(!cache_files_.empty());
+    cache_file = cache_files_.back().get();
+    insertion_offset = cache_file->Allocate(charge_len, partition_lock);
+    // Create and append to a new file if necessary.
+    if (UNLIKELY(insertion_offset < 0)) {
+      if (!CreateCacheFile().ok()) return false;
+      cache_file = cache_files_.back().get();
+      insertion_offset = cache_file->Allocate(charge_len, partition_lock);
+      if (UNLIKELY(insertion_offset < 0)) return false;
+    }
+
+    // Start deleting old files if there are too many opened.
+    *start_reclaim = cache_files_.size() > max_opened_files_;
+
+    // Do this last. At this point, we are committed to inserting 'key' into the cache.
+    pending_insert_set_.emplace(key.ToString());
+  }
+
+  // Set up a scoped exit to always remove entry from the pending insertion set.
+  auto remove_from_pending_set = MakeScopeExitTrigger([this, &key]() {
+    std::unique_lock<SpinLock> partition_lock(lock_);
+    pending_insert_set_.erase(key.ToString());
+  });
+
+  // Try inserting into the cache.
+  return InsertIntoCache(key, cache_file, insertion_offset, buffer, buffer_len);
+}
+
+void DataCache::Partition::DeleteOldFiles() {
+  std::unique_lock<SpinLock> partition_lock(lock_);
+  DCHECK_GE(oldest_opened_file_, 0);
+  int target = cache_files_.size() - FLAGS_data_cache_max_opened_files;
+  while (oldest_opened_file_ < target) {
+    cache_files_[oldest_opened_file_++]->DeleteFile();
+  }
+}
+
+void DataCache::Partition::EvictedEntry(Slice key, Slice value) {
+  if (closed_) return;
+  // Unpack the cache entry.
+  CacheEntry entry(value);
+  int64_t eviction_len = BitUtil::RoundUp(entry.len(), PAGE_SIZE);
+  DCHECK_EQ(entry.offset() % PAGE_SIZE, 0);
+  entry.file()->PunchHole(entry.offset(), eviction_len);
+  ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_TOTAL_BYTES->Increment(-eviction_len);
+}
+
+// TODO: Switch to using CRC32 once we fix the TODO in hash-util.h
+uint64_t DataCache::Partition::Checksum(const uint8_t* buffer, int64_t buffer_len) {
+  return HashUtil::FastHash64(buffer, buffer_len, 0xcafebeef);
+}
+
+bool DataCache::Partition::VerifyChecksum(const string& ops_name, const CacheEntry& entry,
+    const uint8_t* buffer, int64_t buffer_len) {
+  DCHECK(FLAGS_data_cache_checksum);
+  DCHECK_GE(buffer_len, entry.len());
+  int64_t checksum = Checksum(buffer, entry.len());
+  if (UNLIKELY(checksum != entry.checksum())) {
+    LOG(DFATAL) << Substitute("Checksum mismatch during $0 for file $1 "
+        "offset: $2 len: $3 buffer len: $4. Expected $5, Got $6.", ops_name,
+        entry.file()->path(), entry.offset(), entry.len(), buffer_len, entry.checksum(),
+        checksum);
+    return false;
+  }
+  return true;
+}
+
+Status DataCache::Init() {
+  // Verifies all the configured flags are sane.
+  if (FLAGS_data_cache_file_max_size_bytes < PAGE_SIZE) {
+    return Status(Substitute("Misconfigured --data_cache_file_max_size_bytes: $0 bytes. "
+        "Must be at least $1 bytes", FLAGS_data_cache_file_max_size_bytes, PAGE_SIZE));
+  }
+  if (FLAGS_data_cache_write_concurrency < 1) {
+    return Status(Substitute("Misconfigured --data_cache_write_concurrency: $0. "
+        "Must be at least 1.", FLAGS_data_cache_write_concurrency));
+  }
+
+  // The expected form of the configuration string is: dir1,dir2,..,dirN:capacity
+  // Example: /tmp/data1,/tmp/data2:1TB
+  vector<string> all_cache_configs = Split(config_, ":", SkipEmpty());
+  if (all_cache_configs.size() != 2) {
+    return Status(Substitute("Malformed data cache configuration $0", config_));
+  }
+
+  // Parse the capacity string to make sure it's well-formed.
+  bool is_percent;
+  int64_t capacity = ParseUtil::ParseMemSpec(all_cache_configs[1], &is_percent, 0);
+  if (is_percent) {
+    return Status(Substitute("Malformed data cache capacity configuration $0",
+        all_cache_configs[1]));
+  }
+  if (capacity < PAGE_SIZE) {
+    return Status(Substitute("Configured data cache capacity $0 is too small",
+        all_cache_configs[1]));
+  }
+
+  set<string> cache_dirs;
+  SplitStringToSetUsing(all_cache_configs[0], ",", &cache_dirs);
+  int max_opened_files_per_partition =
+      FLAGS_data_cache_max_opened_files / cache_dirs.size();
+  if (max_opened_files_per_partition < 1) {
+    return Status(Substitute("Misconfigured --data_cache_max_opened_files: $0. Must be "
+        "at least $1.", FLAGS_data_cache_max_opened_files, cache_dirs.size()));
+  }
+  for (const string& dir_path : cache_dirs) {
+    LOG(INFO) << "Adding partition " << dir_path << " with capacity "
+              << PrettyPrinter::PrintBytes(capacity);
+    std::unique_ptr<Partition> partition =
+        make_unique<Partition>(dir_path, capacity, max_opened_files_per_partition);
+    RETURN_IF_ERROR(partition->Init());
+    partitions_.emplace_back(move(partition));
+  }
+  CHECK_GT(partitions_.size(), 0);
+
+  // Starts a thread pool which deletes old files from partitions. DataCache::Store()
+  // will enqueue a request (i.e. a partition index) when it notices the number of files
+  // in a partition exceeds the per-partition limit. The files in a partition will be
+  // closed in the order they are created until it's within the per-partition limit.
+  file_deleter_pool_.reset(new ThreadPool<int>("impala-server",
+      "data-cache-file-deleter", 1, MAX_FILE_DELETER_QUEUE_SIZE,
+      bind<void>(&DataCache::DeleteOldFiles, this, _1, _2)));
+  RETURN_IF_ERROR(file_deleter_pool_->Init());
+
+  return Status::OK();
+}
+
+void DataCache::ReleaseResources() {
+  if (file_deleter_pool_) file_deleter_pool_->Shutdown();
+  for (auto& partition : partitions_) partition->ReleaseResources();
+}
+
+int64_t DataCache::Lookup(const string& filename, int64_t mtime, int64_t offset,
+    int64_t bytes_to_read, uint8_t* buffer) {
+  DCHECK(!partitions_.empty());
+
+  // Construct a cache key. The cache key is also hashed to compute the partition index.
+  const CacheKey key(filename, mtime, offset);
+  int idx = key.Hash() % partitions_.size();
+  int64_t bytes_read = partitions_[idx]->Lookup(key, bytes_to_read, buffer);
+  if (VLOG_IS_ON(3)) {
+    stringstream ss;
+    ss << std::hex << reinterpret_cast<int64_t>(buffer);
+    LOG(INFO) << Substitute("Looking up $0 mtime: $1 offset: $2 bytes_to_read: $3 "
+        "buffer: 0x$4 bytes_read: $5", filename, mtime, offset, bytes_to_read,
+        ss.str(), bytes_read);
+  }
+  return bytes_read;
+}
+
+bool DataCache::Store(const string& filename, int64_t mtime, int64_t offset,
+    const uint8_t* buffer, int64_t buffer_len) {
+  DCHECK(!partitions_.empty());
+
+  // Construct a cache key. The cache key is also hashed to compute the partition index.
+  const CacheKey key(filename, mtime, offset);
+  int idx = key.Hash() % partitions_.size();
+  bool start_reclaim;
+  bool stored = partitions_[idx]->Store(key, buffer, buffer_len, &start_reclaim);
+  if (VLOG_IS_ON(3)) {
+    stringstream ss;
+    ss << std::hex << reinterpret_cast<int64_t>(buffer);
+    LOG(INFO) << Substitute("Storing $0 mtime: $1 offset: $2 bytes_to_read: $3 "
+        "buffer: 0x$4 stored: $5", filename, mtime, offset, buffer_len, ss.str(), stored);
+  }
+  if (start_reclaim) file_deleter_pool_->Offer(idx);
+  return stored;
+}
+
+Status DataCache::CloseFilesAndVerifySizes() {
+  for (auto& partition : partitions_) {
+    RETURN_IF_ERROR(partition->CloseFilesAndVerifySizes());
+  }
+  return Status::OK();
+}
+
+void DataCache::DeleteOldFiles(uint32_t thread_id, int partition_idx) {
+  DCHECK_LT(partition_idx, partitions_.size());
+  partitions_[partition_idx]->DeleteOldFiles();
+}
+
+} // namespace io
+} // namespace impala
diff --git a/be/src/runtime/io/data-cache.h b/be/src/runtime/io/data-cache.h
new file mode 100644
index 0000000..b1bf99d
--- /dev/null
+++ b/be/src/runtime/io/data-cache.h
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <mutex>
+#include <string>
+#include <unistd.h>
+#include <unordered_map>
+#include <unordered_set>
+
+#include "common/status.h"
+#include "util/spinlock.h"
+#include "util/thread-pool.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/slice.h"
+
+namespace kudu {
+class Cache;
+} // kudu
+
+/// This class is an implementation of an IO data cache which is backed by local storage.
+/// It implicitly relies on the OS page cache management to shuffle data between memory
+/// and the storage device. This is useful for caching data read from remote filesystems
+/// (e.g. remote HDFS data node, S3, ABFS, ADLS).
+///
+/// A data cache is divided into one or more partitions based on the configuration
+/// string which specifies a list of directories and their corresponding storage quotas.
+///
+/// Each partition has a meta-data cache which tracks the mappings of cache keys to
+/// the locations of the cached data. A cache key is a tuple of (file's name, file's
+/// modification time, file offset) and a cache entry is a tuple of (backing file,
+/// offset in the backing file, length of the cached data, optional checksum). Each
+/// partition stores its set of cached data in backing files created on local storage.
+/// When inserting new data into the cache, the data is appended to the current backing
+/// file in use. The storage consumption of each cache entry counts towards the quota of
+/// that partition. When a partition reaches its capacity, the least recently used data
+/// in that partition is evicted. Evicted data is removed from the underlying storage by
+/// punching holes in the backing file it's stored in. As a backing file reaches a certain
+/// size (e.g. 4TB), new data will stop being appended to it and a new file will be
+/// created instead. Note that due to hole punching, the backing file is actually sparse.
+/// For instance, a backing file may look like the following after some insertion and
+/// eviction. All the holes in file consume no storage space at all.
+///
+/// 0                                                                             1GB
+/// +----------+----------+----------+-----------------+---------+---------+-------+
+/// |          |          |          |                 |         |         |       |
+/// |   Data   |   Hole   |   Data   |       Hole      |  Data   |  Hole   |  Data |
+/// |          |          |          |                 |         |         |       |
+/// +----------+----------+----------+-----------------+---------+---------+-------+
+///                                                                                ^
+///                                                                    insertion offset
+///
+/// Optionally, checksumming can be enabled to verify read from the cache is consistent
+/// with what was inserted and to verify that multiple attempted insertions with the same
+/// cache key have the same cache content.
+///
+/// Note that the cache currently doesn't support sub-ranges lookup and or handle
+/// overlapping ranges. In other words, if the cache has an entry for a file at range
+/// [0,4095], a look up for range [4000,4095] will result in a miss even though it's a
+/// sub-range of [0,4095]. Also inserting the range [4000,4095] will not consolidate
+/// with any overlapping ranges. In other words, inserting entries for ranges [0,4095]
+/// and [4000,4095] will result in caching the data for range [4000,4095] twice. This
+/// hasn't been a major concern in practice when testing with TPC-DS + parquet but this
+/// requires more investigation for other file formats and workloads.
+///
+/// To probe for cached data in the cache, the interface Lookup() is used; To insert
+/// data into the cache, the interface Store() is used. Write to the backing file and
+/// eviction from it happen synchronously. Currently, Store() is limited to the
+/// concurrency of one thread per partition to prevent slowing down the caller in case
+/// the cache is thrashing and it becomes IO bound. The write concurrency can be tuned
+/// via the knob --data_cache_write_concurrency. Also, Store() has a minimum granularity
+/// of 4KB so any data inserted will be rounded up to the nearest multiple of 4KB.
+///
+/// The number of backing files in all partitions is bound by
+/// --data_cache_max_opened_files. Once the number of files exceeds that set limit, files
+/// are closed and deleted asynchronously by thread in 'file_deleter_pool_'. Stale cache
+/// entries which reference deleted files are erased lazily upon the next access or
+/// indirectly via eviction.
+///
+/// Future work:
+/// - investigate the overlapping ranges support
+/// - be more selective on what to cache
+/// - asynchronous eviction
+/// - better data placement: put on hot data on faster media and lukewarm data in not
+///   so fast storage media
+/// - evaluate the option of exposing the cache via mmap() and pinning similar to HDFS
+///   caching. This has the advantage of not needing to copy out the data but pinning
+///   may complicate the code.
+///
+
+namespace impala {
+namespace io {
+
+class DataCache {
+ public:
+
+  /// 'config' is the configuration string of the form <dir1>,...,<dirN>:<quota>
+  /// in which <dir1>, <dirN> are part of a list of directories for storing cached data
+  /// and each directory corresponds to a cache partition. <quota> is the storage quota
+  /// for each directory. Impala daemons running on the same host will not share any
+  /// caching directories.
+  explicit DataCache(const std::string config) : config_(std::move(config)) { }
+
+  ~DataCache() { ReleaseResources(); }
+
+  /// Parses the configuration string, initializes all partitions in the cache by
+  /// checking for storage space available and creates a backing file for caching.
+  /// Return error if any of the partitions failed to be initialized.
+  Status Init();
+
+  /// Releases any resources (e.g. backing files) consumed by all partitions.
+  void ReleaseResources();
+
+  /// Looks up a cached entry and copies any cached content from the cache into 'buffer'.
+  /// (filename, mtime, offset) forms a cache key. Please note that sub-range lookup is
+  /// currently not supported. See header comments for details.
+  ///
+  /// 'filename'      : name of the requested file
+  /// 'mtime'         : the modification time of the requested file
+  /// 'offset'        : starting offset of the requested region in the file
+  /// 'bytes_to_read' : number of bytes to be read from the cache
+  /// 'buffer'        : output buffer to be written into on cache hit
+  ///
+  /// Returns the number of bytes read from the cache on cache hit; Returns 0 otherwise.
+  ///
+  int64_t Lookup(const std::string& filename, int64_t mtime, int64_t offset,
+      int64_t bytes_to_read, uint8_t* buffer);
+
+  /// Inserts a new cache entry by copying the content in 'buffer' into the cache.
+  /// (filename, mtime, offset) together forms a cache key. Insertion involves writing
+  /// to the backing file and potentially evicting entries synchronously so callers
+  /// may want to avoid holding locks while calling this function.
+  ///
+  /// 'filename'      : name of the file being inserted
+  /// 'mtime'         : the modification time of the file being inserted.
+  /// 'offset'        : the starting offset of the region in the file being inserted
+  /// 'buffer'        : buffer holding the data to be inserted
+  /// 'buffer_len'    : size of 'buffer'
+  ///
+  /// The cache key is hashed and the resulting hash determines the partition to use.
+  ///
+  /// Please note that 'buffer_len' is rounded up to the nearest multiple of 4KB when
+  /// it's being written to the backing file. This ensures that every cache entry starts
+  /// at a 4KB offset in the backing file, making hole punching easier as the entire page
+  /// can be reclaimed.
+  ///
+  /// An entry may not be installed for various reasons:
+  /// - an entry with the given cache key already exists unless 'buffer_len' is larger
+  ///   than the existing entry, in which case, the entry will be replaced with the
+  ///   new data.
+  /// - a pending entry with the same key is already being installed.
+  /// - the maximum write concurrency (via --data_cache_write_concurrency) is reached.
+  /// - IO error when writing to the backing file.
+  ///
+  /// Returns true iff the entry is installed successfully.
+  ///
+  bool Store(const std::string& filename, int64_t mtime, int64_t offset,
+      const uint8_t* buffer, int64_t buffer_len);
+
+  /// Utility function to verify that all partitions' consumption don't exceed their
+  /// quotas. Return error status if checking files' sizes failed or if the total space
+  /// consumed by a partition exceeded its capacity. Will close the backing files of
+  /// partitions before verifying their sizes. Used by test only.
+  Status CloseFilesAndVerifySizes();
+
+ private:
+  class CacheFile;
+  struct CacheKey;
+  class CacheEntry;
+
+  /// An implementation of a cache partition. Each partition maintains its own set of
+  /// cache keys in a LRU cache.
+  class Partition : public kudu::Cache::EvictionCallback {
+   public:
+    /// Creates a partition at the given directory 'path' with quota 'capacity' in bytes.
+    /// 'max_opened_files' is the maximum number of opened files allowed per partition.
+    Partition(const std::string& path, int64_t capacity, int max_opened_files);
+
+    ~Partition();
+
+    /// Initializes the current partition:
+    /// - verifies if the specified directory is valid
+    /// - removes any stale backing file in this partition
+    /// - checks if there is enough storage space
+    /// - checks if the filesystem supports hole punching
+    /// - creates an empty backing file.
+    ///
+    /// Returns error if there is any of the above steps fails. Returns OK otherwise.
+    Status Init();
+
+    /// Close and delete all backing files created for this partition. Also releases
+    /// the memory held by the metadata cache.
+    void ReleaseResources();
+
+    /// Looks up in the meta-data cache with key 'cache_key'. If found, try copying
+    /// 'bytes_to_read' bytes from the backing file into 'buffer'. Returns number
+    /// of bytes read from the cache. Returns 0 if there is a cache miss.
+    int64_t Lookup(const CacheKey& cache_key, int64_t bytes_to_read, uint8_t* buffer);
+
+    /// Inserts a entry with key 'cache_key' and data in 'buffer' into the cache.
+    /// 'buffer_len' is the length of buffer. 'start_reclaim' is set to true if
+    /// the number of backing files exceeds the per partition limit. Returns true if
+    /// the entry is inserted. Returns false otherwise.
+    bool Store(const CacheKey& cache_key, const uint8_t* buffer, int64_t buffer_len,
+        bool* start_reclaim);
+
+    /// Callback invoked when evicting an entry from the cache. 'key' is the cache key
+    /// of the entry being evicted and 'value' contains the cache entry which is the
+    /// meta-data of where the cached data is stored.
+    virtual void EvictedEntry(kudu::Slice key, kudu::Slice value) override;
+
+    /// Utility function to verify that the backing files don't exceed the capacity
+    /// of the partition. Used by test only.
+    ///
+    /// Return error status if:
+    /// - getting files' sizes failed
+    /// - total space consumed exceeded the partition's capacity
+    /// - file's size exceeded the specified limit in --data_cache_file_max_size
+    ///
+    /// Will close the backing files of partitions before verifying their sizes.
+    ///
+    /// Returns OK otherwise.
+    Status CloseFilesAndVerifySizes();
+
+    /// Deletes old backing files until number of backing files is no larger than
+    /// --data_cache_max_opened_files.
+    void DeleteOldFiles();
+
+   private:
+    /// The directory path which this partition stores cached data in.
+    const std::string path_;
+
+    /// The capacity in bytes of this partition.
+    const int64_t capacity_;
+
+    /// Maximum number of opened files allowed in a partition.
+    const int max_opened_files_;
+
+    /// True if this partition has been closed. Expected to be set after all IO
+    /// threads have been joined.
+    bool closed_ = false;
+
+    /// The prefix of the names of the cache backing files.
+    static const char* CACHE_FILE_PREFIX;
+
+    /// Protects the following fields.
+    SpinLock lock_;
+
+    /// Index into 'cache_files_' of the oldest opened file.
+    int oldest_opened_file_ = -1;
+
+    /// The set of backing files used by this partition. By default, cache_files_.back()
+    /// is the latest backing file to which new data is appended. Must be accessed with
+    /// 'lock_' held.
+    std::vector<std::unique_ptr<CacheFile>> cache_files_;
+
+    /// This set tracks cache keys of entries in progress of being inserted into the
+    /// cache. As we don't hold locks while writing to the backing file, this set is
+    /// used to prevent multiple insertion into the cache with the same cache key.
+    /// The insertion path will check against this set and if the entry doesn't already
+    /// exist, it will insert one into this set. Upon completion of cache insertion,
+    /// the entry will be removed from this set. Must be accessed with 'lock_' held.
+    std::unordered_set<std::string> pending_insert_set_;
+
+    /// The LRU cache for tracking the cache key to cache entries mappings.
+    ///
+    /// A cache key is created by calling the constructor of CacheKey, which is a tuple
+    /// of (fname, mtime, offset).
+    ///
+    /// A cache entry has type CacheEntry and it contains the metadata of the cached
+    /// content. Please see comments at CachedEntry for details.
+    std::unique_ptr<kudu::Cache> meta_cache_;
+
+    /// Utility function for creating a new backing file in 'path_'. The cache
+    /// partition's lock needs to be held when calling this function. Returns
+    /// error on failure.
+    Status CreateCacheFile();
+
+    /// Utility function to delete cache files left over from previous runs of Impala.
+    /// Returns error on failure.
+    Status DeleteExistingFiles() const;
+
+    /// Utility function for computing the checksum of 'buffer' with length 'buffer_len'.
+    static uint64_t Checksum(const uint8_t* buffer, int64_t buffer_len);
+
+    /// Helper function which handles the case in which the key to be inserted already
+    /// exists in the cache. With checksumming enabled, it also verifies that the content
+    /// in 'buffer' matches the expected checksum in the cache's metadata. Please note
+    /// that an existing entry may be overwritten if 'buffer_len' is larger than the
+    /// length of the existing entry. 'handle' is the handle into the metadata cache.
+    /// Needed for referencing the cache entry.
+    ///
+    /// Returns true iff the existing entry already covers the range of 'buffer' so no
+    /// work needs to be done. Returns false otherwise. In which case, the existing entry
+    /// will be overwritten.
+    bool HandleExistingEntry(const kudu::Slice& key, kudu::Cache::Handle* handle,
+        const uint8_t* buffer, int64_t buffer_len);
+
+    /// Helper function to insert a new entry with key 'key' into the LRU cache.
+    /// The content in 'buffer' of length 'buffer_len' in bytes will be written to
+    /// the backing file 'cache_file' at offset 'insertion_offset'.
+    ///
+    /// Returns true iff the insertion into the cache and the write to the backing file
+    /// succeeded. Returns false otherwise.
+    bool InsertIntoCache(const kudu::Slice& key, CacheFile* cache_file,
+        int64_t insertion_offset, const uint8_t* buffer, int64_t buffer_len);
+
+    /// Utility function for verifying that the checksum of 'buffer' with length
+    /// 'buffer_len' matches the checksum recorded in the meta-data 'entry->checksum'.
+    ///
+    /// 'ops_name" is the name of the operation which triggers the checksum verification.
+    /// Currently, it's either "read" or "write" but future changes may add more names.
+    ///
+    /// Returns false if the checksum of 'buffer' doesn't match 'entry->checksum'.
+    static bool VerifyChecksum(const std::string& ops_name, const CacheEntry& entry,
+        const uint8_t* buffer, int64_t buffer_len);
+  };
+
+  /// The configuration string for the data cache.
+  const std::string config_;
+
+  /// The set of all cache partitions.
+  std::vector<std::unique_ptr<Partition>> partitions_;
+
+  /// Thread pool for deleting old files from partitions to keep the number of opened
+  /// files within --date_cache_max_opened_files. This allows deletion requests
+  /// to be queued for deferred processing. There is only one thread in this pool.
+  std::unique_ptr<ThreadPool<int>> file_deleter_pool_;
+
+  /// Thread function called by threads in 'file_deleter_pool_' for deleting old files
+  /// in partitions_[partition_idx].
+  void DeleteOldFiles(uint32_t thread_id, int partition_idx);
+
+};
+
+} // namespace io
+} // namespace impala
+
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 394745f..0edc7e6 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -20,6 +20,7 @@
 #include "common/global-flags.h"
 #include "common/thread-debug-info.h"
 #include "runtime/exec-env.h"
+#include "runtime/io/data-cache.h"
 #include "runtime/io/disk-io-mgr-internal.h"
 #include "runtime/io/handle-cache.inline.h"
 #include "runtime/io/error-converter.h"
@@ -51,6 +52,13 @@ DEFINE_int32(num_disks, 0, "Number of disks on data node.");
 // Default IoMgr configs:
 // The maximum number of the threads per disk is also the max queue depth per disk.
 DEFINE_int32(num_threads_per_disk, 0, "Number of I/O threads per disk");
+// Data cache configuration
+DEFINE_string(data_cache, "", "The configuration string for IO data cache. "
+    "Default to be an empty string so it's disabled. The configuration string is "
+    "expected to be a list of directories, separated by ',', followed by a ':' and "
+    "a capacity quota per directory. For example /data/0,/data/1:1TB means the cache "
+    "may use up to 2TB, with 1TB max in /data/0 and /data/1 respectively. Please note "
+    "that each Impala daemon on a host must have a unique caching directory.");
 
 // Rotational disks should have 1 thread per disk to minimize seeks.  Non-rotational
 // don't have this penalty and benefit from multiple concurrent IO requests.
@@ -228,6 +236,7 @@ DiskIoMgr::~DiskIoMgr() {
   disk_thread_group_.JoinAll();
   for (DiskQueue* disk_queue : disk_queues_) delete disk_queue;
   if (cached_read_options_ != nullptr) hadoopRzOptionsFree(cached_read_options_);
+  if (remote_data_cache_) remote_data_cache_->ReleaseResources();
 }
 
 Status DiskIoMgr::Init() {
@@ -279,6 +288,10 @@ Status DiskIoMgr::Init() {
   ret = hadoopRzOptionsSetByteBufferPool(cached_read_options_, nullptr);
   DCHECK_EQ(ret, 0);
 
+  if (!FLAGS_data_cache.empty()) {
+    remote_data_cache_.reset(new DataCache(FLAGS_data_cache));
+    RETURN_IF_ERROR(remote_data_cache_->Init());
+  }
   return Status::OK();
 }
 
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index ac93c75..c08e17d 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -38,7 +38,9 @@ namespace impala {
 
 namespace io {
 
+class DataCache;
 class DiskQueue;
+
 /// Manager object that schedules IO for all queries on all disks and remote filesystems
 /// (such as S3). Each query maps to one or more RequestContext objects, each of which
 /// has its own queue of scan ranges and/or write ranges.
@@ -189,6 +191,13 @@ class DiskQueue;
 /// be CPU bottlenecked especially if not enough I/O threads for these queues are
 /// started.
 ///
+/// Remote filesystem data caching:
+/// To reduce latency and avoid being network bound when reading from remote filesystems,
+/// a data cache can be optionally enabled (via --data_cache_config) for caching data read
+/// for remote scan ranges on local storage. The cache is independent of file formats.
+/// It's merely caching chunks of file blocks directly on local storage to avoid
+/// fetching them over network. Please see data-cache.h for details.
+///
 /// TODO: We should implement more sophisticated resource management. Currently readers
 /// are the unit of scheduling and we attempt to distribute IOPS between them. Instead
 /// it would be better to have policies based on queries, resource pools, etc.
@@ -358,6 +367,8 @@ class DiskIoMgr : public CacheLineAligned {
   /// is something invalid about the scan range.
   Status ValidateScanRange(ScanRange* range) WARN_UNUSED_RESULT;
 
+  DataCache* remote_data_cache() { return remote_data_cache_.get(); }
+
  private:
   DISALLOW_COPY_AND_ASSIGN(DiskIoMgr);
   friend class DiskIoMgrTest_Buffers_Test;
@@ -428,6 +439,11 @@ class DiskIoMgr : public CacheLineAligned {
   /// Helper for AllocateBuffersForRange() to compute the buffer sizes for a scan range
   /// with length 'scan_range_len', given that 'max_bytes' of memory should be allocated.
   std::vector<int64_t> ChooseBufferSizes(int64_t scan_range_len, int64_t max_bytes);
+
+  /// Singleton IO data cache for remote reads. If configured, it will be probed for all
+  /// non-local reads and data read from remote data nodes will be stored in it. If not
+  /// configured, this would be NULL.
+  std::unique_ptr<DataCache> remote_data_cache_;
 };
 }
 }
diff --git a/be/src/runtime/io/hdfs-file-reader.cc b/be/src/runtime/io/hdfs-file-reader.cc
index 0bbf984..d83887c 100644
--- a/be/src/runtime/io/hdfs-file-reader.cc
+++ b/be/src/runtime/io/hdfs-file-reader.cc
@@ -18,18 +18,23 @@
 #include <algorithm>
 
 #include "gutil/strings/substitute.h"
+#include "runtime/io/data-cache.h"
 #include "runtime/io/disk-io-mgr-internal.h"
 #include "runtime/io/hdfs-file-reader.h"
 #include "runtime/io/request-context.h"
 #include "runtime/io/request-ranges.h"
 #include "util/hdfs-util.h"
 #include "util/impalad-metrics.h"
+#include "util/scope-exit-trigger.h"
 
 #include "common/names.h"
 
 DEFINE_bool(use_hdfs_pread, false, "Enables using hdfsPread() instead of hdfsRead() "
     "when performing HDFS read operations. This is necessary to use HDFS hedged reads "
     "(assuming the HDFS client is configured to do so).");
+DEFINE_bool(always_use_data_cache, false, "(Advanced) Always uses the IO data cache "
+    "for all reads, regardless of whether the read is local or remote. By default, the "
+    "IO data cache is only used if the data is expected to be remote. Used by tests.");
 
 #ifndef NDEBUG
 DECLARE_int32(stress_disk_read_delay_ms);
@@ -98,12 +103,31 @@ Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
         scan_range_->mtime(), request_context, &borrowed_hdfs_fh));
     hdfs_file = borrowed_hdfs_fh->file();
   }
+  // Make sure to release any borrowed file handle.
+  auto release_borrowed_hdfs_fh = MakeScopeExitTrigger([this, &borrowed_hdfs_fh]() {
+    if (borrowed_hdfs_fh != nullptr) {
+      scan_range_->io_mgr_->ReleaseCachedHdfsFileHandle(scan_range_->file_string(),
+          borrowed_hdfs_fh);
+    }
+  });
 
   int64_t max_chunk_size = scan_range_->MaxReadChunkSize();
   Status status = Status::OK();
   {
     ScopedTimer<MonotonicStopWatch> req_context_read_timer(
         scan_range_->reader_->read_timer_);
+
+    // If it's a remote scan range, try reading from the remote data cache.
+    DataCache* remote_data_cache = io_mgr->remote_data_cache();
+    bool try_cache = (!expected_local_ || FLAGS_always_use_data_cache) &&
+        remote_data_cache != nullptr;
+    int64_t cached_read = 0;
+    if (try_cache) {
+      cached_read = ReadDataCache(remote_data_cache, file_offset, buffer, bytes_to_read);
+      DCHECK_GE(cached_read, 0);
+      *bytes_read = cached_read;
+    }
+
     while (*bytes_read < bytes_to_read) {
       int chunk_size = min(bytes_to_read - *bytes_read, max_chunk_size);
       DCHECK_GT(chunk_size, 0);
@@ -116,8 +140,7 @@ Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
       // ReadFromPosInternal() might fail due to a bad file handle.
       // If that was the case, allow for a retry to fix it.
       status = ReadFromPosInternal(hdfs_file, position_in_file,
-          borrowed_hdfs_fh != nullptr, buffer + *bytes_read, chunk_size,
-          &current_bytes_read);
+          buffer + *bytes_read, chunk_size, &current_bytes_read);
 
       // Retry if:
       // - first read was not successful
@@ -131,10 +154,12 @@ Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
                 scan_range_->file_string(), scan_range_->mtime(),
                 request_context, &borrowed_hdfs_fh));
         hdfs_file = borrowed_hdfs_fh->file();
+        VLOG_FILE << "Reopening file " << scan_range_->file_string()
+                  << " with mtime " << scan_range_->mtime()
+                  << " offset " << file_offset;
         req_context_read_timer.Start();
         status = ReadFromPosInternal(hdfs_file, position_in_file,
-            borrowed_hdfs_fh != nullptr, buffer + *bytes_read, chunk_size,
-            &current_bytes_read);
+            buffer + *bytes_read, chunk_size, &current_bytes_read);
       }
       if (!status.ok()) {
         break;
@@ -150,16 +175,20 @@ Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
       // Collect and accumulate statistics
       GetHdfsStatistics(hdfs_file);
     }
-  }
 
-  if (borrowed_hdfs_fh != nullptr) {
-    io_mgr->ReleaseCachedHdfsFileHandle(scan_range_->file_string(), borrowed_hdfs_fh);
+    int64_t cached_bytes_missed = *bytes_read - cached_read;
+    if (try_cache && status.ok() && cached_bytes_missed > 0) {
+      DCHECK_LE(*bytes_read, bytes_to_read);
+      WriteDataCache(remote_data_cache, file_offset, buffer, *bytes_read,
+          cached_bytes_missed);
+    }
   }
+
   return status;
 }
 
 Status HdfsFileReader::ReadFromPosInternal(hdfsFile hdfs_file, int64_t position_in_file,
-    bool is_borrowed_fh, uint8_t* buffer, int64_t chunk_size, int* bytes_read) {
+    uint8_t* buffer, int64_t chunk_size, int* bytes_read) {
   // For file handles from the cache, any of the below file operations may fail
   // due to a bad file handle.
   if (FLAGS_use_hdfs_pread) {
@@ -170,9 +199,15 @@ Status HdfsFileReader::ReadFromPosInternal(hdfsFile hdfs_file, int64_t position_
               *scan_range_->file_string()));
     }
   } else {
-    // If the file handle is borrowed, it may not be at the appropriate
-    // location. Seek to the appropriate location.
-    if (is_borrowed_fh) {
+    const int64_t cur_offset = hdfsTell(hdfs_fs_, hdfs_file);
+    if (cur_offset == -1) {
+      return Status(TErrorCode::DISK_IO_ERROR, GetBackendString(),
+          Substitute("Error getting current offset of file $0: $1",
+              *scan_range_->file_string(), GetHdfsErrorMsg("")));
+    }
+    // If the file handle is borrowed or if we had a cache hit earlier, it may not be
+    // at the appropriate location. Seek to the appropriate location.
+    if (cur_offset != position_in_file) {
       if (hdfsSeek(hdfs_fs_, hdfs_file, position_in_file) != 0) {
         return Status(TErrorCode::DISK_IO_ERROR, GetBackendString(),
             Substitute("Error seeking to $0 in file: $1: $2",
@@ -207,6 +242,35 @@ void HdfsFileReader::CachedFile(uint8_t** data, int64_t* length) {
   *length = hadoopRzBufferLength(cached_buffer_);
 }
 
+int64_t HdfsFileReader::ReadDataCache(DataCache* remote_data_cache, int64_t file_offset,
+    uint8_t* buffer, int64_t bytes_to_read) {
+  int64_t cached_read = remote_data_cache->Lookup(*scan_range_->file_string(),
+      scan_range_->mtime(), file_offset, bytes_to_read, buffer);
+  if (LIKELY(cached_read > 0)) {
+    scan_range_->reader_->data_cache_hit_bytes_counter_->Add(cached_read);
+    if (LIKELY(cached_read == bytes_to_read)) {
+      scan_range_->reader_->data_cache_hit_counter_->Add(1);
+    } else {
+      scan_range_->reader_->data_cache_partial_hit_counter_->Add(1);
+    }
+    ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES->Increment(cached_read);
+  }
+  return cached_read;
+}
+
+void HdfsFileReader::WriteDataCache(DataCache* remote_data_cache, int64_t file_offset,
+    const uint8_t* buffer, int64_t buffer_len, int64_t bytes_missed) {
+  // Intentionally leave out the return value as cache insertion is opportunistic.
+  // It can fail for various reasons:
+  // - multiple threads inserting the same entry at the same time
+  // - the entry is too large to be accomodated in the cache
+  remote_data_cache->Store(*scan_range_->file_string(), scan_range_->mtime(),
+      file_offset, buffer, buffer_len);
+  scan_range_->reader_->data_cache_miss_bytes_counter_->Add(bytes_missed);
+  scan_range_->reader_->data_cache_miss_counter_->Add(1);
+  ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_MISS_BYTES->Increment(bytes_missed);
+}
+
 void HdfsFileReader::Close() {
   unique_lock<SpinLock> hdfs_lock(lock_);
   if (exclusive_hdfs_fh_ != nullptr) {
diff --git a/be/src/runtime/io/hdfs-file-reader.h b/be/src/runtime/io/hdfs-file-reader.h
index f8dbf93..01a0394 100644
--- a/be/src/runtime/io/hdfs-file-reader.h
+++ b/be/src/runtime/io/hdfs-file-reader.h
@@ -23,6 +23,8 @@
 namespace impala {
 namespace io {
 
+class DataCache;
+
 /// File reader class for HDFS.
 class HdfsFileReader : public FileReader {
 public:
@@ -35,16 +37,42 @@ public:
   virtual Status Open(bool use_file_handle_cache) override;
   virtual Status ReadFromPos(int64_t file_offset, uint8_t* buffer,
       int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override;
-  /// Reads from the DN cache. On success, sets cached_buffer_ to the DN
-  /// buffer and returns a pointer to the underlying raw buffer.
-  /// Returns nullptr if the data is not cached.
-  virtual void CachedFile(uint8_t** data, int64_t* length) override;
   virtual void Close() override;
   virtual void ResetState() override;
   virtual std::string DebugString() const override;
+
+  /// Reads from the DN cache. On success, sets cached_buffer_ to the DN buffer
+  /// and returns a pointer to the underlying raw buffer. 'cached_buffer_' is set to
+  /// nullptr if the data is not cached and 'length' is set to 0.
+  ///
+  /// Please note that this interface is only effective for local HDFS reads as it
+  /// relies on HDFS caching. For remote reads, this interface is not used.
+  virtual void CachedFile(uint8_t** data, int64_t* length) override;
+
 private:
+  /// Probes 'remote_data_cache' for a hit. The requested file's name and mtime
+  /// are stored in 'scan_range_'. 'file_offset' is the offset into the file to read
+  /// and 'bytes_to_read' is the number of bytes requested. On success, copies the
+  /// content of the cache into 'buffer' and returns the number of bytes read.
+  /// Also updates various cache metrics. Returns 0 on cache miss.
+  int64_t ReadDataCache(DataCache* remote_data_cache, int64_t file_offset,
+      uint8_t* buffer, int64_t bytes_to_read);
+
+  /// Inserts into 'remote_data_cache' with 'buffer' which contains the data read
+  /// from a file at 'file_offset'. 'buffer_len' is the length of the buffer in bytes.
+  /// The file's name and mtime are stored in 'scan_range_'. 'cached_bytes_missed' is
+  /// the number of bytes missed in the cache. Used for updating cache metrics.
+  /// No guarantee that the entry is inserted as caching is opportunistic.
+  void WriteDataCache(DataCache* remote_data_cache, int64_t file_offset,
+      const uint8_t* buffer, int64_t buffer_len, int64_t cached_bytes_missed);
+
+  /// Read [position_in_file, position_in_file + chunk_size) from 'hdfs_file'
+  /// into 'buffer'. Update 'bytes_read' on success. Returns error status on
+  /// failure. When not using HDFS pread, this function will always implicitly
+  /// seek to 'position_in_file' if 'hdfs_file' is not at it already.
   Status ReadFromPosInternal(hdfsFile hdfs_file, int64_t position_in_file,
-      bool is_borrowed_fh, uint8_t* buffer, int64_t chunk_size, int* bytes_read);
+      uint8_t* buffer, int64_t chunk_size, int* bytes_read);
+
   void GetHdfsStatistics(hdfsFile hdfs_file);
 
   /// Hadoop filesystem that contains the file being read.
diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h
index 4b06516..9d84cb8 100644
--- a/be/src/runtime/io/request-context.h
+++ b/be/src/runtime/io/request-context.h
@@ -184,6 +184,26 @@ class RequestContext {
     disks_accessed_bitmap_ = disks_accessed_bitmap;
   }
 
+  void set_data_cache_hit_counter(RuntimeProfile::Counter* counter) {
+    data_cache_hit_counter_ = counter;
+  }
+
+  void set_data_cache_partial_hit_counter(RuntimeProfile::Counter* counter) {
+    data_cache_partial_hit_counter_ = counter;
+  }
+
+  void set_data_cache_miss_counter(RuntimeProfile::Counter* counter) {
+    data_cache_miss_counter_ = counter;
+  }
+
+  void set_data_cache_hit_bytes_counter(RuntimeProfile::Counter* counter) {
+    data_cache_hit_bytes_counter_ = counter;
+  }
+
+  void set_data_cache_miss_bytes_counter(RuntimeProfile::Counter* counter) {
+    data_cache_miss_bytes_counter_ = counter;
+  }
+
   TUniqueId instance_id() const { return instance_id_; }
   void set_instance_id(const TUniqueId& instance_id) {
     instance_id_ = instance_id;
@@ -325,6 +345,13 @@ class RequestContext {
   /// builtin atomic instruction. Probably good enough for now.
   RuntimeProfile::Counter* disks_accessed_bitmap_ = nullptr;
 
+  /// Data cache counters.
+  RuntimeProfile::Counter* data_cache_hit_counter_ = nullptr;
+  RuntimeProfile::Counter* data_cache_partial_hit_counter_ = nullptr;
+  RuntimeProfile::Counter* data_cache_miss_counter_ = nullptr;
+  RuntimeProfile::Counter* data_cache_hit_bytes_counter_ = nullptr;
+  RuntimeProfile::Counter* data_cache_miss_bytes_counter_ = nullptr;
+
   /// Total number of bytes read locally, updated at end of each range scan
   AtomicInt64 bytes_read_local_{0};
 
diff --git a/be/src/util/filesystem-util-test.cc b/be/src/util/filesystem-util-test.cc
index cd081a9..590bf7c 100644
--- a/be/src/util/filesystem-util-test.cc
+++ b/be/src/util/filesystem-util-test.cc
@@ -118,3 +118,39 @@ TEST(FilesystemUtil, Paths) {
   EXPECT_EQ(string("def"), relpath);
 }
 
+// This test exercises the handling of different directory entry types by GetEntryNames().
+TEST(FilesystemUtil, DirEntryTypes) {
+  // Setup a temporary directory with one subdir
+  path base_dir = filesystem::unique_path();
+  path dir = base_dir / "impala-dir";
+  path subdir = dir / "impala-subdir";
+  path file = dir / "impala-file";
+
+  ASSERT_OK(FileSystemUtil::RemoveAndCreateDirectory(subdir.string()));
+  ASSERT_OK(FileSystemUtil::CreateFile(file.string()));
+
+  // Verify that all directory entires are listed with the default parameters.
+  vector<string> entries;
+  ASSERT_OK(FileSystemUtil::Directory::GetEntryNames(dir.string(), &entries));
+  ASSERT_EQ(entries.size(), 2);
+  for (const string& entry : entries) {
+    EXPECT_TRUE(entry == "impala-subdir" || entry == "impala-file");
+  }
+
+  // Verify that only directory type entries are listed with DIR_ENTRY_DIR.
+  entries.resize(0);
+  ASSERT_OK(FileSystemUtil::Directory::GetEntryNames(dir.string(), &entries, 0,
+      FileSystemUtil::Directory::DIR_ENTRY_DIR));
+  ASSERT_EQ(entries.size(), 1);
+  EXPECT_TRUE(entries[0] == "impala-subdir");
+
+  // Verify that only file type entries are listed with DIR_ENTRY_REG.
+  entries.resize(0);
+  ASSERT_OK(FileSystemUtil::Directory::GetEntryNames(dir.string(), &entries, 0,
+      FileSystemUtil::Directory::DIR_ENTRY_REG));
+  ASSERT_EQ(entries.size(), 1);
+  EXPECT_TRUE(entries[0] == "impala-file");
+
+  // Cleanup.
+  filesystem::remove_all(dir);
+}
diff --git a/be/src/util/filesystem-util.cc b/be/src/util/filesystem-util.cc
index 83fce2f..13775a9 100644
--- a/be/src/util/filesystem-util.cc
+++ b/be/src/util/filesystem-util.cc
@@ -16,14 +16,27 @@
 // under the License.
 
 #include <fcntl.h>
+#include <string.h>
 #include <sys/resource.h>
 #include <sys/stat.h>
 #include <boost/filesystem.hpp>
+#include <gutil/strings/numbers.h>
 #include <gutil/strings/substitute.h>
+#include <gutil/strings/util.h>
 
+#include "exec/kudu-util.h"
+#include "gutil/macros.h"
 #include "runtime/io/error-converter.h"
+#include "kudu/util/env.h"
+#include "kudu/util/path_util.h"
 #include "util/filesystem-util.h"
 #include "util/error-util.h"
+#include "util/scope-exit-trigger.h"
+#include "util/uid-util.h"
+
+#ifndef FALLOC_FL_PUNCH_HOLE
+#include <linux/falloc.h>
+#endif
 
 #include "common/names.h"
 
@@ -31,6 +44,10 @@ namespace errc = boost::system::errc;
 namespace filesystem = boost::filesystem;
 
 using boost::system::error_code;
+using kudu::Env;
+using kudu::JoinPathSegments;
+using kudu::RWFile;
+using kudu::Slice;
 using std::exception;
 using namespace strings;
 
@@ -97,7 +114,8 @@ Status FileSystemUtil::RemovePaths(const vector<string>& directories) {
 }
 
 Status FileSystemUtil::CreateFile(const string& file_path) {
-  int fd = creat(file_path.c_str(), S_IRUSR | S_IWUSR);
+  int fd;
+  RETRY_ON_EINTR(fd, creat(file_path.c_str(), S_IRUSR | S_IWUSR));
 
   if (fd < 0) {
     return Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
@@ -223,7 +241,6 @@ bool FileSystemUtil::GetRelativePath(const string& path, const string& start,
   return false;
 }
 
-
 FileSystemUtil::Directory::Directory(const string& path)
     : dir_path_(path),
       status_(Status::OK()) {
@@ -237,7 +254,7 @@ FileSystemUtil::Directory::~Directory() {
   if (dir_stream_ != nullptr) (void)closedir(dir_stream_);
 }
 
-bool FileSystemUtil::Directory::GetNextEntryName(string* entry_name) {
+bool FileSystemUtil::Directory::GetNextEntryName(string* entry_name, EntryType type) {
   DCHECK(entry_name != nullptr);
 
   if (status_.ok()) {
@@ -250,6 +267,14 @@ bool FileSystemUtil::Directory::GetNextEntryName(string* entry_name) {
           || strcmp(dir_entry->d_name, "..") == 0) {
         continue;
       }
+      if (type != DIR_ENTRY_ANY && dir_entry->d_type != DT_UNKNOWN) {
+        if (type == DIR_ENTRY_REG && dir_entry->d_type != DT_REG) {
+          continue;
+        }
+        if (type == DIR_ENTRY_DIR && dir_entry->d_type != DT_DIR) {
+          continue;
+        }
+      }
       *entry_name = dir_entry->d_name;
       return true;
     }
@@ -265,18 +290,114 @@ bool FileSystemUtil::Directory::GetNextEntryName(string* entry_name) {
 }
 
 Status FileSystemUtil::Directory::GetEntryNames(const string& path,
-    vector<string>* entry_names, int max_result_size) {
+    vector<string>* entry_names, int max_result_size, EntryType type) {
   DCHECK(entry_names != nullptr);
 
   Directory dir(path);
   entry_names->clear();
   string entry_name;
-  while ((max_result_size <= 0 || entry_names->size() < max_result_size)
-      && dir.GetNextEntryName(&entry_name)) {
+  while ((max_result_size <= 0 || entry_names->size() < max_result_size) &&
+      dir.GetNextEntryName(&entry_name, type)) {
     entry_names->push_back(entry_name);
   }
 
   return dir.GetLastStatus();
 }
 
+// Copied and pasted from Kudu source: src/kudu/fs/log_block_manager.cc
+bool FileSystemUtil::IsBuggyEl6Kernel(const string& kernel_release) {
+  autodigit_less lt;
+
+  // Only el6 is buggy.
+  if (kernel_release.find("el6") == string::npos) return false;
+
+  // Kernels in the 6.8 update stream (2.6.32-642.a.b) are fixed
+  // for a >= 15.
+  //
+  // https://rhn.redhat.com/errata/RHSA-2017-0307.html
+  if (MatchPattern(kernel_release, "2.6.32-642.*.el6.*") &&
+      lt("2.6.32-642.15.0", kernel_release)) {
+    return false;
+  }
+
+  // If the kernel older is than 2.6.32-674 (el6.9), it's buggy.
+  return lt(kernel_release, "2.6.32-674");
+}
+
+Status FileSystemUtil::CheckHolePunch(const string& path) {
+  kudu::Env* env = kudu::Env::Default();
+
+  // Check if the filesystem of 'path' is vulnerable to to KUDU-1508.
+  bool is_on_ext;
+  KUDU_RETURN_IF_ERROR(env->IsOnExtFilesystem(path, &is_on_ext),
+      Substitute("Failed to check filesystem type at $0", path));
+  if (is_on_ext && IsBuggyEl6Kernel(env->GetKernelRelease())) {
+    return Status(Substitute("Data dir $0 is on an ext4 filesystem vulnerable to "
+        "KUDU-1508.", path));
+  }
+
+  // Open the test file.
+  string filename = JoinPathSegments(path, PrintId(GenerateUUID()));
+  unique_ptr<RWFile> test_file;
+  KUDU_RETURN_IF_ERROR(kudu::Env::Default()->NewRWFile(filename, &test_file),
+      Substitute("Failed to create file $0", filename));
+
+  // Delete file on exit from the function.
+  auto delete_file = MakeScopeExitTrigger([&filename]() {
+      kudu::Env::Default()->DeleteFile(filename);
+  });
+
+  const int buffer_size = 4096 * 4;
+  unique_ptr<uint8_t[]> buffer(new uint8_t[buffer_size]);
+  memset(buffer.get(), 0xaa, buffer_size);
+  for (int i = 0; i < 4; ++i) {
+    KUDU_RETURN_IF_ERROR(test_file->Write(i * buffer_size,
+        Slice(buffer.get(), buffer_size)),
+        Substitute("Failed to write to file $0", path));
+  }
+
+  const off_t init_file_size = buffer_size * 4;
+  uint64_t sz;
+  KUDU_RETURN_IF_ERROR(env->GetFileSizeOnDisk(filename, &sz),
+      "Failed to get pre-punch file size");
+  if (sz != init_file_size) {
+    return Status(Substitute("Unexpected pre-punch file size for $0: expected $1 but "
+        "got $2", filename, init_file_size, sz));
+  }
+
+  // Punch the hole, testing the file's size again.
+  const off_t hole_offset = buffer_size;
+  const off_t hole_size = buffer_size * 2;
+  KUDU_RETURN_IF_ERROR(test_file->PunchHole(hole_offset, hole_size),
+      "Failed to punch hole");
+
+  const int final_file_size = init_file_size - hole_size;
+  KUDU_RETURN_IF_ERROR(env->GetFileSizeOnDisk(filename, &sz),
+      "Failed to get post-punch file size");
+  if (sz != final_file_size) {
+    return Status(Substitute("Unexpected post-punch file size for $0: expected $1 but "
+        "got $2", filename, final_file_size, sz));
+  }
+
+  int offset = 0;
+  unique_ptr<uint8_t[]> tmp_buffer(new uint8_t[buffer_size]);
+  memset(tmp_buffer.get(), 0, buffer_size);
+  KUDU_RETURN_IF_ERROR(test_file->Read(offset, Slice(tmp_buffer.get(), buffer_size)),
+      Substitute("Failed to read file $0", path));
+  if (memcmp(tmp_buffer.get(), buffer.get(), buffer_size) != 0) {
+    return Status(Substitute("Mismatched file content $0 at offset 0", filename));
+  }
+
+  offset = hole_offset + hole_size;
+  memset(tmp_buffer.get(), 0, buffer_size);
+  KUDU_RETURN_IF_ERROR(test_file->Read(offset, Slice(tmp_buffer.get(), buffer_size)),
+      Substitute("Failed to read file $0", path));
+  if (memcmp(tmp_buffer.get(), buffer.get(), buffer_size) != 0) {
+    return Status(Substitute("Mismatched file content $0 at offset $1", filename,
+        offset));
+  }
+
+  return Status::OK();
+}
+
 } // namespace impala
diff --git a/be/src/util/filesystem-util.h b/be/src/util/filesystem-util.h
index c87fb9c..b354f5f 100644
--- a/be/src/util/filesystem-util.h
+++ b/be/src/util/filesystem-util.h
@@ -85,8 +85,35 @@ class FileSystemUtil {
   static bool GetRelativePath(const std::string& path, const std::string& start,
       std::string* relpath);
 
+  /// Ext4 on certain El6 releases may result in inconsistent metadata after punching
+  /// holes in files. The filesystem may require fsck repair on next reboot. See KUDU-1508
+  /// for details. This function returns true iff the kernel version Impala is running on
+  /// is vulnerable to KUDU-1508.
+  static bool IsBuggyEl6Kernel(const string& kernel_release);
+
+  /// Checks if the filesystem at the directory 'path' supports hole punching (i.e.
+  /// calling fallocate with FALLOC_FL_PUNCH_HOLE).
+  ///
+  /// Return error status if:
+  /// - 'path' resides in a ext filesystem and the kernel version is vulnerable to
+  ///    KUDU-1508.
+  /// - creating a test file at 'path' failed.
+  /// - punching holes in test file failed.
+  /// - reading the test file's size failed.
+  ///
+  /// Returns OK otherwise.
+  static Status CheckHolePunch(const string& path);
+
   class Directory {
    public:
+    // Different types of entry in the directory
+    enum EntryType {
+      DIR_ENTRY_ANY = 0,
+      DIR_ENTRY_REG, // regular file (DT_REG in readdir() result)
+      DIR_ENTRY_DIR, // directory    (DT_DIR in readdir() result)
+      DIR_ENTRY_NUM_TYPES
+    };
+
     /// Opens 'path' directory for iteration. Directory entries "." and ".." will be
     /// skipped while iterating through the entries.
     Directory(const string& path);
@@ -96,17 +123,19 @@ class FileSystemUtil {
 
     /// Reads the next directory entry and sets 'entry_name' to the entry name.
     /// Returns 'false' if an error occured or no more entries were found in the
-    /// directory. Return 'true' on success.
-    bool GetNextEntryName(std::string* entry_name);
+    /// directory. If 'type' is specified, only entries of 'type' will be included.
+    /// Return 'true' on success.
+    bool GetNextEntryName(std::string* entry_name, EntryType type = DIR_ENTRY_ANY);
 
     /// Returns the status of the previous directory operation.
     const Status& GetLastStatus() const { return status_; }
 
     /// Reads no more than 'max_result_size' directory entries from 'path' and returns
     /// their names in 'entry_names' vector. If 'max_result_size' <= 0, every directory
-    /// entry is returned. Directory entries "." and ".." will be skipped.
+    /// entry is returned. Directory entries "." and ".." will be skipped. If 'type' is
+    /// specified, only entries of 'type' will be included.
     static Status GetEntryNames(const string& path, std::vector<std::string>* entry_names,
-        int max_result_size = 0);
+        int max_result_size = 0, EntryType type = DIR_ENTRY_ANY);
 
    private:
     DIR* dir_stream_;
diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc
index 6db82e0..cdebe4c 100644
--- a/be/src/util/impalad-metrics.cc
+++ b/be/src/util/impalad-metrics.cc
@@ -56,6 +56,14 @@ const char* ImpaladMetricKeys::IO_MGR_SHORT_CIRCUIT_BYTES_READ =
     "impala-server.io-mgr.short-circuit-bytes-read";
 const char* ImpaladMetricKeys::IO_MGR_CACHED_BYTES_READ =
     "impala-server.io-mgr.cached-bytes-read";
+const char* ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES =
+    "impala-server.io-mgr.remote-data-cache-hit-bytes";
+const char* ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_MISS_BYTES =
+    "impala-server.io-mgr.remote-data-cache-miss-bytes";
+const char* ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_TOTAL_BYTES =
+    "impala-server.io-mgr.remote-data-cache-total-bytes";
+const char* ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_DROPPED_BYTES =
+    "impala-server.io-mgr.remote-data-cache-dropped-bytes";
 const char* ImpaladMetricKeys::IO_MGR_BYTES_WRITTEN =
     "impala-server.io-mgr.bytes-written";
 const char* ImpaladMetricKeys::IO_MGR_NUM_CACHED_FILE_HANDLES =
@@ -141,6 +149,9 @@ IntCounter* ImpaladMetrics::IO_MGR_BYTES_READ = NULL;
 IntCounter* ImpaladMetrics::IO_MGR_LOCAL_BYTES_READ = NULL;
 IntCounter* ImpaladMetrics::IO_MGR_SHORT_CIRCUIT_BYTES_READ = NULL;
 IntCounter* ImpaladMetrics::IO_MGR_CACHED_BYTES_READ = NULL;
+IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES = NULL;
+IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_MISS_BYTES = NULL;
+IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_DROPPED_BYTES = NULL;
 IntCounter* ImpaladMetrics::IO_MGR_BYTES_WRITTEN = NULL;
 IntCounter* ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_REOPENED = NULL;
 IntCounter* ImpaladMetrics::HEDGED_READ_OPS = NULL;
@@ -168,7 +179,7 @@ IntGauge* ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES = NULL;
 IntGauge* ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING = NULL;
 IntGauge* ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT = NULL;
 IntGauge* ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT = NULL;
-IntGauge* ImpaladMetrics::IO_MGR_TOTAL_BYTES = NULL;
+IntGauge* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_TOTAL_BYTES = NULL;
 IntGauge* ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT = NULL;
 IntGauge* ImpaladMetrics::NUM_QUERIES_REGISTERED = NULL;
 IntGauge* ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS = NULL;
@@ -305,6 +316,15 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
   IO_MGR_BYTES_WRITTEN = m->AddCounter(
       ImpaladMetricKeys::IO_MGR_BYTES_WRITTEN, 0);
 
+  IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES = m->AddCounter(
+      ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES, 0);
+  IO_MGR_REMOTE_DATA_CACHE_MISS_BYTES = m->AddCounter(
+      ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_MISS_BYTES, 0);
+  IO_MGR_REMOTE_DATA_CACHE_TOTAL_BYTES = m->AddGauge(
+      ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_TOTAL_BYTES, 0);
+  IO_MGR_REMOTE_DATA_CACHE_DROPPED_BYTES = m->AddCounter(
+      ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_DROPPED_BYTES, 0);
+
   IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO =
       StatsMetric<uint64_t, StatsType::MEAN>::CreateAndRegister(m,
       ImpaladMetricKeys::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO);
diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h
index 2a25e86..b39e4cc 100644
--- a/be/src/util/impalad-metrics.h
+++ b/be/src/util/impalad-metrics.h
@@ -79,6 +79,19 @@ class ImpaladMetricKeys {
   /// Total number of cached bytes read by the io mgr
   static const char* IO_MGR_CACHED_BYTES_READ;
 
+  /// Total number of bytes read from the remote data cache.
+  static const char* IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES;
+
+  /// Total number of bytes missing from the remote data cache.
+  static const char* IO_MGR_REMOTE_DATA_CACHE_MISS_BYTES;
+
+  /// Current byte size of the remote data cache.
+  static const char* IO_MGR_REMOTE_DATA_CACHE_TOTAL_BYTES;
+
+  /// Total number of bytes not inserted into the remote data cache due to
+  /// concurrency limit.
+  static const char* IO_MGR_REMOTE_DATA_CACHE_DROPPED_BYTES;
+
   /// Total number of bytes written to disk by the io mgr (for spilling)
   static const char* IO_MGR_BYTES_WRITTEN;
 
@@ -221,6 +234,9 @@ class ImpaladMetrics {
   static IntCounter* IO_MGR_BYTES_READ;
   static IntCounter* IO_MGR_LOCAL_BYTES_READ;
   static IntCounter* IO_MGR_CACHED_BYTES_READ;
+  static IntCounter* IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES;
+  static IntCounter* IO_MGR_REMOTE_DATA_CACHE_MISS_BYTES;
+  static IntCounter* IO_MGR_REMOTE_DATA_CACHE_DROPPED_BYTES;
   static IntCounter* IO_MGR_SHORT_CIRCUIT_BYTES_READ;
   static IntCounter* IO_MGR_BYTES_WRITTEN;
   static IntCounter* IO_MGR_CACHED_FILE_HANDLES_REOPENED;
@@ -253,7 +269,7 @@ class ImpaladMetrics {
   static IntGauge* IO_MGR_NUM_FILE_HANDLES_OUTSTANDING;
   static IntGauge* IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT;
   static IntGauge* IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT;
-  static IntGauge* IO_MGR_TOTAL_BYTES;
+  static IntGauge* IO_MGR_REMOTE_DATA_CACHE_TOTAL_BYTES;
   static IntGauge* NUM_FILES_OPEN_FOR_INSERT;
   static IntGauge* NUM_QUERIES_REGISTERED;
   static IntGauge* RESULTSET_CACHE_TOTAL_NUM_ROWS;
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index 47193fb..b7df29b 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -112,7 +112,12 @@ parser.add_option("--docker_auto_ports", dest="docker_auto_ports",
                        "(Beewax, HS2, Web UIs, etc), which avoids collisions with other "
                        "running processes. If false, ports are mapped to the same ports "
                        "on localhost as the non-docker impala cluster.")
-
+parser.add_option("--data_cache_dir", dest="data_cache_dir", default=None,
+                  help="This specifies a base directory in which the IO data cache will "
+                       "use.")
+parser.add_option("--data_cache_size", dest="data_cache_size", default=0,
+                  help="This specifies the maximum storage usage of the IO data cache "
+                       "each Impala daemon can use.")
 
 # For testing: list of comma-separated delays, in milliseconds, that delay impalad catalog
 # replica initialization. The ith delay is applied to the ith impalad.
@@ -328,6 +333,18 @@ def build_impalad_arg_lists(cluster_size, num_coordinators, use_exclusive_coordi
           delay=delay_list[i],
           args=args)
 
+    if options.data_cache_dir:
+      # create the base directory
+      assert options.data_cache_size != 0, "--data_cache_dir must be used along " \
+          "with --data_cache_size"
+      data_cache_path = \
+          os.path.join(options.data_cache_dir, "impala-datacache-{0}".format(str(i)))
+      # Try creating the directory if it doesn't exist already. May raise exception.
+      if not os.path.exists(data_cache_path):
+        os.mkdir(data_cache_path)
+      args = "-data_cache={dir}:{quota} {args}".format(
+          dir=data_cache_path, quota=options.data_cache_size, args=args)
+
     # Appended at the end so they can override previous args.
     if i < len(per_impalad_args):
       args = "{args} {per_impalad_args}".format(
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index fa04749..4437325 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -400,6 +400,46 @@
     "key": "impala-server.io-mgr.local-bytes-read"
   },
   {
+    "description": "Total number of bytes of hits in the remote data cache.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Impala Server Io Mgr Remote Data Cache Hit In Bytes",
+    "units": "BYTES",
+    "kind": "COUNTER",
+    "key": "impala-server.io-mgr.remote-data-cache-hit-bytes"
+  },
+  {
+    "description": "Total number of bytes of misses in the remote data cache.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Impala Server Io Mgr Remote Data Cache Miss In Bytes",
+    "units": "BYTES",
+    "kind": "COUNTER",
+    "key": "impala-server.io-mgr.remote-data-cache-miss-bytes"
+  },
+  {
+    "description": "Current byte size of the remote data cache.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Impala Server Io Mgr Remote Data Cache Bytes Size",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "impala-server.io-mgr.remote-data-cache-total-bytes"
+  },
+  {
+    "description": "Total number of bytes not inserted in remote data cache due to concurrency limit.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Impala Server Io Mgr Remote Data Cache Bytes Not Inserted Due To Concurrency limit",
+    "units": "UNIT",
+    "kind": "COUNTER",
+    "key": "impala-server.io-mgr.remote-data-cache-dropped-bytes"
+  },
+  {
     "description": "The number of allocated IO buffers. IO buffers are shared by all queries.",
     "contexts": [
       "IMPALAD"
diff --git a/testdata/workloads/functional-query/queries/QueryTest/data-cache.test b/testdata/workloads/functional-query/queries/QueryTest/data-cache.test
new file mode 100644
index 0000000..4cd1062
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/data-cache.test
@@ -0,0 +1,49 @@
+====
+---- QUERY
+create table test_parquet stored as parquet as select * from tpch_parquet.lineitem;
+---- RUNTIME_PROFILE
+# Exepct all cache misses for tpch_parquet.lineitem.
+row_regex: .*DataCacheHitBytes: 0.*
+row_regex: .*DataCacheHitCount: 0 \(0\).*
+row_regex: .*DataCacheMissCount: 64 \(64\).*
+====
+---- QUERY
+select count(*) from tpch_parquet.lineitem t1, test_parquet t2 where t1.l_orderkey = t2.l_orderkey;
+---- RUNTIME_PROFILE
+# Exepct cache hits for t1 and cache misses for t2.
+row_regex: .*DataCacheHitCount: 6 \(6\).*
+row_regex: .*DataCacheMissBytes: 0.*
+row_regex: .*DataCacheMissCount: 0 \(0\).*
+row_regex: .*DataCachePartialHitCount: 0 \(0\).*
+row_regex: .*DataCacheHitBytes: 0.*
+row_regex: .*DataCacheHitCount: 0 \(0\).*
+row_regex: .*DataCacheMissCount: 3 \(3\).*
+row_regex: .*DataCachePartialHitCount: 0 \(0\).*
+====
+---- QUERY
+select count(distinct l_orderkey) from test_parquet;
+---- RESULTS
+1500000
+---- RUNTIME_PROFILE
+# Expect all cache hits.
+row_regex: .*DataCacheHitCount: 3 \(3\).*
+row_regex: .*DataCacheMissBytes: 0.*
+row_regex: .*DataCacheMissCount: 0 \(0\).*
+row_regex: .*DataCachePartialHitCount: 0 \(0\).*
+====
+---- QUERY
+# Overwrite temp table with subset of data.
+insert overwrite test_parquet select * from tpch_parquet.lineitem where l_shipmode = 'AIR';
+====
+---- QUERY
+# Verifies that stale data from the cache is not used due to change in mtime.
+select count(distinct l_orderkey) from test_parquet;
+---- RESULTS
+652393
+---- RUNTIME_PROFILE
+# Expect all cache misses due to change in mtime.
+row_regex: .*DataCacheHitBytes: 0.*
+row_regex: .*DataCacheHitCount: 0 \(0\).*
+row_regex: .*DataCacheMissCount: 2 \(2\).*
+row_regex: .*DataCachePartialHitCount: 0 \(0\).*
+====
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index d221b43..70af069 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -115,7 +115,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       if catalogd_args is not None:
         func.func_dict[CATALOGD_ARGS] = catalogd_args
       if start_args is not None:
-        func.func_dict[START_ARGS] = start_args
+        func.func_dict[START_ARGS] = start_args.split()
       if sentry_config is not None:
         func.func_dict[SENTRY_CONFIG] = sentry_config
       if sentry_log_dir is not None:
@@ -137,7 +137,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       if arg in method.func_dict:
         cluster_args.append("--%s=%s " % (arg, method.func_dict[arg]))
     if START_ARGS in method.func_dict:
-      cluster_args.append(method.func_dict[START_ARGS])
+      cluster_args.extend(method.func_dict[START_ARGS])
 
     if SENTRY_CONFIG in method.func_dict:
       self._start_sentry_service(method.func_dict[SENTRY_CONFIG],
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 7fe8216..0be8c95 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -18,12 +18,14 @@
 # The base class that should be used for almost all Impala tests
 
 import grp
+import json
 import logging
 import os
 import pprint
 import pwd
 import pytest
 import re
+import requests
 import shutil
 import socket
 import subprocess
@@ -115,6 +117,7 @@ EE_TEST_LOGS_DIR = os.getenv("IMPALA_EE_TEST_LOGS_DIR")
 COMMENT_LINES_REGEX = r'(?:\s*--.*\n)*'
 SET_PATTERN = re.compile(
     COMMENT_LINES_REGEX + r'\s*set\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*=*', re.I)
+METRICS_URL = 'http://localhost:25000/metrics?json'
 
 
 # Base class for Impala tests. All impala test cases should inherit from this class
@@ -302,6 +305,27 @@ class ImpalaTestSuite(BaseTestSuite):
       result.append(tuple(result_fields))
     return result
 
+  def get_debug_page(self, page_url):
+    """Returns the content of the debug page 'page_url' as json."""
+    response = requests.get(page_url)
+    assert response.status_code == requests.codes.ok
+    return json.loads(response.text)
+
+  def get_metric(self, name):
+    """Finds the metric with name 'name' and returns its value as an int."""
+    def iter_metrics(group):
+      for m in group['metrics']:
+        yield m
+      for c in group['child_groups']:
+        for m in iter_metrics(c):
+          yield m
+
+    metrics = self.get_debug_page(METRICS_URL)['metric_group']
+    for m in iter_metrics(metrics):
+      if m['name'] == name:
+        return int(m['value'])
+    assert False, "Could not find metric: %s" % name
+
   def __verify_exceptions(self, expected_strs, actual_str, use_db):
     """
     Verifies that at least one of the strings in 'expected_str' is either:
diff --git a/tests/custom_cluster/test_data_cache.py b/tests/custom_cluster/test_data_cache.py
new file mode 100644
index 0000000..f8db6b0
--- /dev/null
+++ b/tests/custom_cluster/test_data_cache.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+
+class TestDataCache(CustomClusterTestSuite):
+  """ This test enables the data cache and verfies that cache hit and miss counts
+  in the runtime profile and metrics are as expected.
+  """
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--always_use_data_cache=true --data_cache_write_concurrency=64",
+      start_args="--data_cache_dir=/tmp --data_cache_size=500MB", cluster_size=1)
+  def test_data_cache(self, vector, unique_database):
+      """ This test creates a temporary table from another table, overwrites it with
+      some other data and verifies that no stale data is read from the cache. Runs
+      with a single node to make it easier to verify the runtime profile.  """
+      self.run_test_case('QueryTest/data-cache', vector, unique_database)
+      assert self.get_metric('impala-server.io-mgr.remote-data-cache-dropped-bytes') >= 0
+      assert self.get_metric('impala-server.io-mgr.remote-data-cache-hit-bytes') > 0
+      assert self.get_metric('impala-server.io-mgr.remote-data-cache-miss-bytes') > 0
+      assert self.get_metric('impala-server.io-mgr.remote-data-cache-total-bytes') > 0
diff --git a/tests/custom_cluster/test_krpc_metrics.py b/tests/custom_cluster/test_krpc_metrics.py
index e728237..0acc72e 100644
--- a/tests/custom_cluster/test_krpc_metrics.py
+++ b/tests/custom_cluster/test_krpc_metrics.py
@@ -15,9 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import json
 import pytest
-import requests
 import time
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.impala_cluster import ImpalaCluster
@@ -27,7 +25,6 @@ from tests.verifiers.mem_usage_verifier import MemUsageVerifier
 class TestKrpcMetrics(CustomClusterTestSuite):
   """Test for KRPC metrics that require special arguments during cluster startup."""
   RPCZ_URL = 'http://localhost:25000/rpcz?json'
-  METRICS_URL = 'http://localhost:25000/metrics?json'
   TEST_QUERY = 'select count(*) from tpch_parquet.lineitem l1 \
       join tpch_parquet.lineitem l2 where l1.l_orderkey = l2.l_orderkey;'
 
@@ -41,12 +38,6 @@ class TestKrpcMetrics(CustomClusterTestSuite):
       pytest.skip('runs only in exhaustive')
     super(TestKrpcMetrics, cls).setup_class()
 
-  def get_debug_page(self, page_url):
-    """Returns the content of the debug page 'page_url' as json."""
-    response = requests.get(page_url)
-    assert response.status_code == requests.codes.ok
-    return json.loads(response.text)
-
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args('-datastream_service_queue_mem_limit=1B \
                                      -datastream_service_num_svc_threads=1')
@@ -68,21 +59,6 @@ class TestKrpcMetrics(CustomClusterTestSuite):
 
     assert before < after
 
-  def get_metric(self, name):
-    """Finds the metric with name 'name' and returns its value as an int."""
-    def iter_metrics(group):
-      for m in group['metrics']:
-        yield m
-      for c in group['child_groups']:
-        for m in iter_metrics(c):
-          yield m
-
-    metrics = self.get_debug_page(self.METRICS_URL)['metric_group']
-    for m in iter_metrics(metrics):
-      if m['name'] == name:
-        return int(m['value'])
-    assert False, "Could not find metric: %s" % name
-
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args('-datastream_service_queue_mem_limit=1B \
                                      -datastream_service_num_svc_threads=1')