You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/05/23 18:30:16 UTC

[impala] branch master updated: IMPALA-8344: Add support for running the minicluster with S3Guard

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b09612  IMPALA-8344: Add support for running the minicluster with S3Guard
6b09612 is described below

commit 6b09612e763aace6ec3ec22031e4e960b9a41e3d
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Fri Mar 22 14:16:31 2019 -0700

    IMPALA-8344: Add support for running the minicluster with S3Guard
    
    Some tests can fail on S3 due to some operations that are eventually
    consistent. S3Guard stores extra metadata in a DynamoDB to solve
    several consistency issues.
    
    This adds support for running the minicluster on S3 with S3Guard.
    S3Guard is configured by the following environment variables:
    S3GUARD_ENABLED: defaults to false, set to true to enable S3Guard
    S3GUARD_DYNAMODB_TABLE: name of the DynamoDB table to use. This must
      be exclusively owned by this minicluster. The dataload scripts
      initialize this table and will purge entries if the table already
      exists. The table should be in the same region as the S3_BUCKET
      for the minicluster.
    S3GUARD_DYNAMODB_REGION - AWS region for S3GUARD_DYNAMODB_TABLE
    These environment variables only impact S3 configurations.
    
    The support comes from three pieces:
    1. Configuration changes in core-site.xml to add the appropriate
       parameters.
    2. Updating dataload to initialize/purge the s3guard dynamodb table
       and import data appropriately.
    3. Update tests to manipulate files through the HDFS command line
       rather than through s3 utilities. This takes the filesystem
       utility code for ABFS (which actually calls HDFS command line),
       makes it generic, and uses it for S3Guard.
    
    Testing:
     - Ran multiple rounds of s3 tests
     - Aborted tests in the middle and restarted the s3 tests (to test
       the s3guard reinitialization code)
    
    Change-Id: I3c748529a494bb6e70fec96dc031523ff79bf61d
    Reviewed-on: http://gerrit.cloudera.org:8080/13020
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Sahil Takiar <st...@cloudera.com>
---
 bin/generate_xml_config.py                         |   5 +-
 bin/impala-config.sh                               |  13 ++
 bin/jenkins/release_cloud_resources.sh             |  50 +++++++
 infra/python/deps/requirements.txt                 |   1 +
 testdata/bin/load-test-warehouse-snapshot.sh       |   8 ++
 .../common/etc/hadoop/conf/core-site.xml.py        | 111 ++++++++++++++++
 .../common/etc/hadoop/conf/core-site.xml.tmpl      | 145 ---------------------
 tests/common/impala_test_suite.py                  |  13 +-
 tests/query_test/test_scanners_fuzz.py             |   2 +-
 tests/util/abfs_util.py                            | 113 ----------------
 tests/util/filesystem_utils.py                     |   1 +
 tests/util/hdfs_util.py                            | 124 +++++++++++++++++-
 tests/util/s3_util.py                              | 103 ---------------
 13 files changed, 316 insertions(+), 373 deletions(-)

diff --git a/bin/generate_xml_config.py b/bin/generate_xml_config.py
index 18e3615..af9c2e5 100755
--- a/bin/generate_xml_config.py
+++ b/bin/generate_xml_config.py
@@ -80,9 +80,10 @@ def dump_config(d, source_path, out):
   print >>out, dedent(header)
   for k, v in sorted(d.iteritems()):
     try:
+      k_new = _substitute_env_vars(k)
       if isinstance(v, int):
         v = str(v)
-      v = _substitute_env_vars(v)
+      v_new = _substitute_env_vars(v)
     except KeyError, e:
       raise Exception("failed environment variable substitution for value {k}: {e}"
                       .format(k=k, e=e))
@@ -90,7 +91,7 @@ def dump_config(d, source_path, out):
       <property>
         <name>{name}</name>
         <value>{value}</value>
-      </property>""".format(name=xmlescape(k), value=xmlescape(v))
+      </property>""".format(name=xmlescape(k_new), value=xmlescape(v_new))
   print >>out, "</configuration>"
 
 
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index b57d625..fca708d 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -284,6 +284,9 @@ export TARGET_FILESYSTEM="${TARGET_FILESYSTEM-hdfs}"
 export ERASURE_CODING="${ERASURE_CODING-false}"
 export FILESYSTEM_PREFIX="${FILESYSTEM_PREFIX-}"
 export S3_BUCKET="${S3_BUCKET-}"
+export S3GUARD_ENABLED="${S3GUARD_ENABLED-false}"
+export S3GUARD_DYNAMODB_TABLE="${S3GUARD_DYNAMODB_TABLE-}"
+export S3GUARD_DYNAMODB_REGION="${S3GUARD_DYNAMODB_REGION-}"
 export azure_tenant_id="${azure_tenant_id-DummyAdlsTenantId}"
 export azure_client_id="${azure_client_id-DummyAdlsClientId}"
 export azure_client_secret="${azure_client_secret-DummyAdlsClientSecret}"
@@ -401,6 +404,16 @@ if [ "${TARGET_FILESYSTEM}" = "s3" ]; then
   else
     echo "S3 access already validated"
   fi
+  # If using s3guard, verify that the dynamodb table and region are set
+  if [[ "${S3GUARD_ENABLED}" = "true" ]]; then
+    if [[ -z "${S3GUARD_DYNAMODB_TABLE}" || -z "${S3GUARD_DYNAMODB_REGION}" ]]; then
+      echo "When S3GUARD_ENABLED=true, S3GUARD_DYNAMODB_TABLE and
+        S3GUARD_DYNAMODB_REGION must be set"
+      echo "S3GUARD_DYNAMODB_TABLE: ${S3GUARD_DYNAMODB_TABLE}"
+      echo "S3GUARD_DYNAMODB_REGION: ${S3GUARD_DYNAMODB_REGION}"
+      return 1
+    fi
+  fi
 elif [ "${TARGET_FILESYSTEM}" = "adls" ]; then
   # Basic error checking
   if [[ "${azure_client_id}" = "DummyAdlsClientId" ||\
diff --git a/bin/jenkins/release_cloud_resources.sh b/bin/jenkins/release_cloud_resources.sh
new file mode 100755
index 0000000..a03f955
--- /dev/null
+++ b/bin/jenkins/release_cloud_resources.sh
@@ -0,0 +1,50 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Release cloud resources (useful for Jenkins jobs). Should be called
+# when the minicluster is shut down. The minicluster should not be used
+# after this called, as this removes the test warehouse data.
+
+set -euo pipefail
+trap 'echo Error in $0 at line $LINENO: $(cd "'$PWD'" && awk "NR == $LINENO" $0)' ERR
+
+. ${IMPALA_HOME}/bin/impala-config.sh > /dev/null 2>&1
+: ${TEST_WAREHOUSE_DIR=/test-warehouse}
+
+# This is currently only implemented for s3.
+# TODO: implement this for other cloud filesystems
+# NOTE: Some environment variables referenced here are checked for validity in
+# bin/impala-config.sh. Because this is releasing resources, we double check them here
+# as well.
+if [[ "${TARGET_FILESYSTEM}" == "s3" ]]; then
+  # For S3, S3_BUCKET should always be defined.
+  [[ -n "${S3_BUCKET}" ]]
+  if [[ "${S3GUARD_ENABLED}" == "true" ]]; then
+    # If S3GUARD_ENABLED == true, then S3GUARD_DYNAMODB_TABLE and S3GUARD_DYNAMODB_REGION
+    # must also be defined. Verify that before proceeding.
+    [[ -n "${S3GUARD_DYNAMODB_TABLE}" && -n "${S3GUARD_DYNAMODB_REGION}" ]]
+    echo "Cleaning up s3guard and deleting Dynamo DB ${S3GUARD_DYNAMODB_TABLE} ..."
+    hadoop s3guard destroy -meta "dynamodb://${S3GUARD_DYNAMODB_TABLE}" \
+        -region "${S3GUARD_DYNAMODB_REGION}"
+    echo "Done cleaning up s3guard"
+  fi
+  # Remove the test warehouse
+  echo "Removing test warehouse from s3://${S3_BUCKET}${TEST_WAREHOUSE_DIR} ..."
+  aws s3 rm --recursive --quiet s3://${S3_BUCKET}${TEST_WAREHOUSE_DIR}
+  echo "Done removing test warehouse"
+fi
diff --git a/infra/python/deps/requirements.txt b/infra/python/deps/requirements.txt
index fd07ca8..fa0deb0 100644
--- a/infra/python/deps/requirements.txt
+++ b/infra/python/deps/requirements.txt
@@ -23,6 +23,7 @@
 # multiple times (though maybe they could be).
 
 allpairs == 2.0.1
+# TODO: boto3 is now unused, it can be removed.
 boto3 == 1.2.3
   simplejson == 3.3.0 # For python version 2.6
   botocore == 1.3.30
diff --git a/testdata/bin/load-test-warehouse-snapshot.sh b/testdata/bin/load-test-warehouse-snapshot.sh
index cfec558..c53cb03 100755
--- a/testdata/bin/load-test-warehouse-snapshot.sh
+++ b/testdata/bin/load-test-warehouse-snapshot.sh
@@ -59,6 +59,14 @@ if [[ "$REPLY" =~ ^[Yy]$ ]]; then
       echo "Deleting pre-existing data in s3 failed, aborting."
       exit 1
     fi
+    if [[ "${S3GUARD_ENABLED}" = "true" ]]; then
+      # Initialize the s3guard dynamodb table and clear it out. This is valid even if
+      # the table already exists.
+      hadoop s3guard init -meta "dynamodb://${S3GUARD_DYNAMODB_TABLE}" \
+        -region "${S3GUARD_DYNAMODB_REGION}"
+      hadoop s3guard prune -seconds 1 -meta "dynamodb://${S3GUARD_DYNAMODB_TABLE}" \
+        -region "${S3GUARD_DYNAMODB_REGION}"
+    fi
   else
     # Either isilon or hdfs, no change in procedure.
     if hadoop fs -test -d ${FILESYSTEM_PREFIX}${TEST_WAREHOUSE_DIR}; then
diff --git a/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py b/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py
new file mode 100644
index 0000000..6af28f4
--- /dev/null
+++ b/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py
@@ -0,0 +1,111 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+
+kerberize = os.environ.get('IMPALA_KERBERIZE') == '1'
+target_filesystem = os.environ.get('TARGET_FILESYSTEM')
+
+compression_codecs = [
+  'org.apache.hadoop.io.compress.GzipCodec',
+  'org.apache.hadoop.io.compress.DefaultCodec',
+  'com.hadoop.compression.lzo.LzoCodec',
+  'com.hadoop.compression.lzo.LzopCodec',
+  'org.apache.hadoop.io.compress.BZip2Codec'
+]
+
+auth_to_local_rules = [
+  'RULE:[2:$1@$0](authtest@REALM.COM)s/(.*)@REALM.COM/auth_to_local_user/',
+  'RULE:[1:$1]',
+  'RULE:[2:$1]',
+  'DEFAULT'
+]
+
+CONFIG = {
+  'fs.defaultFS': '${DEFAULT_FS}',
+  'dfs.replication': '${HDFS_REPLICATION}',
+
+  # Compression codecs
+  'io.compression.codecs': ",".join(compression_codecs),
+  'io.compression.deoc.lzo.class': 'com.hadoop.compression.lzo.LzoCodec',
+
+  # Set up proxyuser
+  'hadoop.proxyuser.${USER}.hosts': '*',
+  'hadoop.proxyuser.${USER}.groups': '*',
+
+  # Trash is enabled since some tests (in metadata/test_ddl.py) depend on it
+  # The trash interval is set to 1030 years to avoid checkpointing until 3000 AD
+  'fs.trash.interval': 541728000,
+
+  # AuthorizationTest depends on auth_to_local configs. These tests are run
+  # irrespective of whether kerberos is enabled.
+  'hadoop.security.auth_to_local': '\n'.join(auth_to_local_rules),
+
+  # Location of the KMS key provider
+  'hadoop.security.key.provider.path': 'kms://http@${INTERNAL_LISTEN_HOST}:9600/kms',
+
+  # Needed as long as multiple nodes are running on the same address. For Impala
+  # testing only.
+  'yarn.scheduler.include-port-in-node-name': 'true',
+
+  # ADLS configuration
+  # Note: This is needed even when not running on ADLS, because some frontend tests
+  # include ADLS paths that require initializing an ADLS filesystem. See
+  # ExplainTest.testScanNodeFsScheme().
+  'dfs.adls.oauth2.access.token.provider.type': 'ClientCredential',
+  'dfs.adls.oauth2.client.id': '${azure_client_id}',
+  'dfs.adls.oauth2.credential': '${azure_client_secret}',
+  'dfs.adls.oauth2.refresh.url':
+    'https://login.windows.net/${azure_tenant_id}/oauth2/token',
+
+  # ABFS configuration
+  # Note: This is needed even when not running on ABFS for the same reason as for ADLS.
+  # See ExplainTest.testScanNodeFsScheme().
+  'fs.azure.account.auth.type': 'OAuth',
+  'fs.azure.account.oauth.provider.type':
+    'org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider',
+  'fs.azure.account.oauth2.client.id': '${azure_client_id}',
+  'fs.azure.account.oauth2.client.secret': '${azure_client_secret}',
+  'fs.azure.account.oauth2.client.endpoint':
+    'https://login.microsoftonline.com/${azure_tenant_id}/oauth2/token',
+
+  # This property can be used in tests to ascertain that this core-site.xml from
+  # the classpath has been loaded. (Ex: TestRequestPoolService)
+  'impala.core-site.overridden': 'true',
+}
+
+if target_filesystem == 's3':
+  CONFIG.update({'fs.s3a.connection.maximum': 1500})
+  s3guard_enabled = os.environ.get("S3GUARD_ENABLED") == 'true'
+  if s3guard_enabled:
+    CONFIG.update({
+      'fs.s3a.metadatastore.impl':
+        'org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore',
+      'fs.s3a.s3guard.ddb.table': '${S3GUARD_DYNAMODB_TABLE}',
+      'fs.s3a.s3guard.ddb.region': '${S3GUARD_DYNAMODB_REGION}',
+    })
+
+if kerberize:
+  CONFIG.update({
+    'hadoop.security.authentication': 'kerberos',
+    'hadoop.security.authorization': 'true',
+    'hadoop.proxyuser.hive.hosts': '*',
+    'hadoop.proxyuser.hive.groups': '*',
+  })
diff --git a/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.tmpl b/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.tmpl
deleted file mode 100644
index 7eaff6f..0000000
--- a/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.tmpl
+++ /dev/null
@@ -1,145 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<configuration>
-  <property>
-    <name>fs.defaultFS</name>
-    <value>${DEFAULT_FS}</value>
-  </property>
-
-  <property>
-    <name>dfs.replication</name>
-    <value>${HDFS_REPLICATION}</value>
-  </property>
-
- <property>
-    <name>io.compression.codecs</name>
-    <value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,org.apache.hadoop.io.compress.BZip2Codec</value>
-  </property>
-
-  <property>
-    <name>io.compression.codec.lzo.class</name>
-    <value>com.hadoop.compression.lzo.LzoCodec</value>
-  </property>
-
-  <property>
-    <name>hadoop.proxyuser.${USER}.hosts</name>
-    <value>*</value>
-  </property>
-
-  <property>
-  <!-- Trash is enabled since some tests (in metadata/test_ddl.py) depend on it.
-       It is set to 1030 years to avoid checkpointing until 3000 AD -->
-    <name>fs.trash.interval</name>
-    <value>541728000</value>
-  </property>
-
-  <property>
-  <!-- AuthorizationTest depends on auth_to_local configs. These tests are run
-       irrespective of whether kerberos is enabled. -->
-    <name>hadoop.security.auth_to_local</name>
-    <value>RULE:[2:$1@$0](authtest@REALM.COM)s/(.*)@REALM.COM/auth_to_local_user/
-RULE:[1:$1]
-RULE:[2:$1]
-DEFAULT</value>
-  </property>
-
-  <property>
-    <name>hadoop.proxyuser.${USER}.groups</name>
-    <value>*</value>
-  </property>
-
-  <!-- Needed as long as multiple nodes are running on the same address. For Impala
-       testing only -->
-  <property>
-    <name>yarn.scheduler.include-port-in-node-name</name>
-    <value>true</value>
-  </property>
-
- <property>
-    <name>fs.s3a.connection.maximum</name>
-    <value>1500</value>
-  </property>
-
-  <!-- Location of the KMS key provider -->
-  <property>
-    <name>hadoop.security.key.provider.path</name>
-    <value>kms://http@${INTERNAL_LISTEN_HOST}:9600/kms</value>
-  </property>
-
-  <!-- BEGIN Kerberos settings -->
-  <property>
-    <name>hadoop.security.authentication</name>
-    <value>kerberos</value>
-  </property>
-
-  <property>
-    <name>hadoop.security.authorization</name>
-    <value>true</value>
-  </property>
-
-  <property>
-    <name>hadoop.proxyuser.hive.hosts</name>
-    <value>*</value>
-  </property>
-
-  <property>
-    <name>hadoop.proxyuser.hive.groups</name>
-    <value>*</value>
-  </property>
-
-  <!-- END Kerberos settings -->
-
-  <property>
-    <name>dfs.adls.oauth2.access.token.provider.type</name>
-    <value>ClientCredential</value>
-  </property>
-
-  <property>
-    <name>dfs.adls.oauth2.client.id</name>
-    <value>${azure_client_id}</value>
-  </property>
-
-  <property>
-    <name>dfs.adls.oauth2.credential</name>
-    <value>${azure_client_secret}</value>
-  </property>
-
-  <property>
-    <name>dfs.adls.oauth2.refresh.url</name>
-    <value>https://login.windows.net/${azure_tenant_id}/oauth2/token</value>
-  </property>
-
-  <property>
-    <name>fs.azure.account.auth.type</name>
-    <value>OAuth</value>
-  </property>
-
-  <property>
-    <name>fs.azure.account.oauth.provider.type</name>
-    <value>org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider</value>
-  </property>
-
-  <property>
-    <name>fs.azure.account.oauth2.client.id</name>
-    <value>${azure_client_id}</value>
-  </property>
-
-  <property>
-    <name>fs.azure.account.oauth2.client.secret</name>
-    <value>${azure_client_secret}</value>
-  </property>
-
-  <property>
-    <name>fs.azure.account.oauth2.client.endpoint</name>
-    <value>https://login.microsoftonline.com/${azure_tenant_id}/oauth2/token</value>
-  </property>
-
-  <!-- This property can be used in tests to ascertain that this core-site.xml from
-   the classpath has been loaded. (Ex: TestRequestPoolService) -->
-  <property>
-    <name>impala.core-site.overridden</name>
-    <value>true</value>
-  </property>
-
-</configuration>
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index fffad06..bdd14dd 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -62,6 +62,7 @@ from tests.util.filesystem_utils import (
     IS_ABFS,
     IS_ADLS,
     S3_BUCKET_NAME,
+    S3GUARD_ENABLED,
     ADLS_STORE_NAME,
     FILESYSTEM_PREFIX,
     FILESYSTEM_NAME)
@@ -70,9 +71,8 @@ from tests.util.hdfs_util import (
   HdfsConfig,
   get_hdfs_client,
   get_hdfs_client_from_conf,
-  NAMENODE)
-from tests.util.s3_util import S3Client
-from tests.util.abfs_util import ABFSClient
+  NAMENODE,
+  HadoopFsCommandLineClient)
 from tests.util.test_file_parser import (
   QueryTestSectionReader,
   parse_query_test_file,
@@ -172,9 +172,12 @@ class ImpalaTestSuite(BaseTestSuite):
     cls.hdfs_client = cls.create_hdfs_client()
     cls.filesystem_client = cls.hdfs_client
     if IS_S3:
-      cls.filesystem_client = S3Client(S3_BUCKET_NAME)
+      # S3Guard needs filesystem operations to go through the s3 connector. Use the
+      # HDFS command line client.
+      cls.filesystem_client = HadoopFsCommandLineClient("S3")
     elif IS_ABFS:
-      cls.filesystem_client = ABFSClient()
+      # ABFS is implemented via HDFS command line client
+      cls.filesystem_client = HadoopFsCommandLineClient("ABFS")
     elif IS_ADLS:
       cls.filesystem_client = ADLSClient(ADLS_STORE_NAME)
 
diff --git a/tests/query_test/test_scanners_fuzz.py b/tests/query_test/test_scanners_fuzz.py
index 9501775..a7a578c 100644
--- a/tests/query_test/test_scanners_fuzz.py
+++ b/tests/query_test/test_scanners_fuzz.py
@@ -176,7 +176,7 @@ class TestScannersFuzzing(ImpalaTestSuite):
     # Copy all of the local files and directories to hdfs.
     to_copy = ["%s/%s" % (tmp_table_dir, file_or_dir)
                for file_or_dir in os.listdir(tmp_table_dir)]
-    check_call(['hdfs', 'dfs', '-copyFromLocal'] + to_copy + [fuzz_table_location])
+    check_call(['hdfs', 'dfs', '-copyFromLocal', '-d'] + to_copy + [fuzz_table_location])
 
     if "SCANNER_FUZZ_KEEP_FILES" not in os.environ:
       shutil.rmtree(tmp_table_dir)
diff --git a/tests/util/abfs_util.py b/tests/util/abfs_util.py
deleted file mode 100644
index 8567888..0000000
--- a/tests/util/abfs_util.py
+++ /dev/null
@@ -1,113 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-# ABFS access utilities
-#
-# This file uses the Hadoop CLI to provide simple functions to the Impala test
-# suite to whatever the default filesystem is
-
-import re
-import subprocess
-import tempfile
-
-from tests.util.filesystem_base import BaseFilesystem
-
-
-class ABFSClient(BaseFilesystem):
-
-  def _hadoop_fs_shell(self, command):
-    hadoop_command = ['hadoop', 'fs'] + command
-    process = subprocess.Popen(hadoop_command,
-          stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-    stdout, stderr = process.communicate()
-    status = process.returncode
-    return (status, stdout, stderr)
-
-  def create_file(self, path, file_data, overwrite=True):
-    fixed_path = self._normalize_path(path)
-    if not overwrite and self.exists(fixed_path): return False
-    f = tempfile.NamedTemporaryFile(delete=False)
-    tmp_path = f.name
-    f.write(file_data)
-    f.close()
-    (status, stdout, stderr) = \
-        self._hadoop_fs_shell(['-put', tmp_path, fixed_path])
-    return status == 0
-
-  def make_dir(self, path, permission=None):
-    fixed_path = self._normalize_path(path)
-    self._hadoop_fs_shell(['-mkdir', '-p', fixed_path])
-    return True
-
-  def copy(self, src, dst):
-    fixed_src = self._normalize_path(src)
-    fixed_dst = self._normalize_path(dst)
-    (status, stdout, stderr) = \
-        self._hadoop_fs_shell(['-cp', fixed_src, fixed_dst])
-    assert status == 0, \
-        'ABFS copy failed: ' + stderr + "; " + stdout
-    assert self.exists(dst), \
-        'ABFS copy failed: Destination file {dst} does not exist'\
-            .format(dst=dst)
-
-  def _inner_ls(self, path):
-    fixed_path = self._normalize_path(path)
-    (status, stdout, stderr) = self._hadoop_fs_shell(['-ls', fixed_path])
-    # Trim the "Found X items" line and trailing new-line
-    entries = stdout.split("\n")[1:-1]
-    files = []
-    for entry in entries:
-      fields = re.split(" +", entry)
-      files.append({
-        'name': fields[7],
-        'length': int(fields[4]),
-        'mode': fields[0]
-      })
-    return files
-
-  def ls(self, path):
-    fixed_path = self._normalize_path(path)
-    files = []
-    for f in self._inner_ls(fixed_path):
-      fname = f['name'].split("/")[-1]
-      if not fname == '':
-        files += [fname]
-    return files
-
-  def exists(self, path):
-    fixed_path = self._normalize_path(path)
-    (status, stdout, stderr) = self._hadoop_fs_shell(['-test', '-e', fixed_path])
-    return status == 0
-
-  def delete_file_dir(self, path, recursive=False):
-    fixed_path = self._normalize_path(path)
-    rm_command = ['-rm', fixed_path]
-    if recursive:
-      rm_command = ['-rm', '-r', fixed_path]
-    (status, stdout, stderr) = self._hadoop_fs_shell(rm_command)
-    return status == 0
-
-  def get_all_file_sizes(self, path):
-    """Returns a list of integers which are all the file sizes of files found
-    under 'path'."""
-    fixed_path = self._normalize_path(path)
-    return [f['length'] for f in
-        self._inner_ls(fixed_path) if f['mode'][0] == "-"]
-
-  def _normalize_path(self, path):
-    # Paths passed in may lack a leading slash
-    return path if path.startswith('/') else '/' + path
diff --git a/tests/util/filesystem_utils.py b/tests/util/filesystem_utils.py
index 76212f6..5b39d36 100644
--- a/tests/util/filesystem_utils.py
+++ b/tests/util/filesystem_utils.py
@@ -44,6 +44,7 @@ ISILON_WEBHDFS_PORT = 8082
 
 # S3 specific values
 S3_BUCKET_NAME = os.getenv("S3_BUCKET")
+S3GUARD_ENABLED = os.getenv("S3GUARD_ENABLED") == "true"
 
 # ADLS / ABFS specific values
 ABFS_ACCOUNT_NAME = os.getenv("azure_storage_account_name")
diff --git a/tests/util/hdfs_util.py b/tests/util/hdfs_util.py
index 3c3a45e..f015f1f 100644
--- a/tests/util/hdfs_util.py
+++ b/tests/util/hdfs_util.py
@@ -19,15 +19,19 @@
 
 import getpass
 import httplib
+import os.path
+import re
 import requests
+import subprocess
+import tempfile
 from os import environ
-from os.path import join as join_path
 from pywebhdfs.webhdfs import PyWebHdfsClient, errors, _raise_pywebhdfs_exception
 from xml.etree.ElementTree import parse
 
 from tests.util.filesystem_base import BaseFilesystem
 from tests.util.filesystem_utils import FILESYSTEM_PREFIX
 
+
 class HdfsConfig(object):
   """Reads an XML configuration file (produced by a mini-cluster) into a dictionary
   accessible via get()"""
@@ -41,14 +45,16 @@ class HdfsConfig(object):
   def get(self, key):
     return self.conf.get(key)
 
+
 # Configuration object for the configuration that the minicluster will use.
-CORE_CONF = HdfsConfig(join_path(environ['HADOOP_CONF_DIR'], "core-site.xml"))
+CORE_CONF = HdfsConfig(os.path.join(environ['HADOOP_CONF_DIR'], "core-site.xml"))
 # NAMENODE is the path prefix that should be used in results, since paths that come
 # out of Impala have been qualified.  When running against the default filesystem,
 # this will be the same as fs.defaultFS.  When running against a secondary filesystem,
 # this will be the same as FILESYSTEM_PREFIX.
 NAMENODE = FILESYSTEM_PREFIX or CORE_CONF.get('fs.defaultFS')
 
+
 class PyWebHdfsClientWithChmod(PyWebHdfsClient, BaseFilesystem):
   def chmod(self, path, permission):
     """Set the permission of 'path' to 'permission' (specified as an octal string, e.g.
@@ -140,6 +146,113 @@ class PyWebHdfsClientWithChmod(PyWebHdfsClient, BaseFilesystem):
       return False
     return True
 
+
+class HadoopFsCommandLineClient(BaseFilesystem):
+  """This client is a wrapper around the hadoop fs command line. This is useful for
+  filesystems that rely on the logic in the Hadoop connector. For example, S3 with
+  S3Guard needs all accesses to go through the S3 connector. This is also useful for
+  filesystems that are fully served by this limited set of functionality (ABFS uses
+  this).
+  """
+
+  def __init__(self, filesystem_type="HDFS"):
+    # The filesystem_type is used only for providing more specific error messages.
+    self.filesystem_type = filesystem_type
+    super(HadoopFsCommandLineClient, self).__init__()
+
+  def _hadoop_fs_shell(self, command):
+    """Helper function wrapper around 'hadoop fs' takes in the arguments as a list."""
+    hadoop_command = ['hadoop', 'fs'] + command
+    process = subprocess.Popen(hadoop_command,
+          stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+    stdout, stderr = process.communicate()
+    status = process.returncode
+    return (status, stdout, stderr)
+
+  def create_file(self, path, file_data, overwrite=True):
+    """Creates a temporary file with the specified file_data on the local filesystem,
+    then puts it into the specified path."""
+    fixed_path = self._normalize_path(path)
+    if not overwrite and self.exists(fixed_path): return False
+    with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
+      tmp_file.write(file_data)
+    (status, stdout, stderr) = \
+        self._hadoop_fs_shell(['-put', tmp_file.name, fixed_path])
+    return status == 0
+
+  def make_dir(self, path, permission=None):
+    """Create a directory at the specified path. Permissions are not supported."""
+    fixed_path = self._normalize_path(path)
+    (status, stdout, stderr) = self._hadoop_fs_shell(['-mkdir', '-p', fixed_path])
+    return status == 0
+
+  def copy(self, src, dst):
+    """Copy the source file to the destination."""
+    fixed_src = self._normalize_path(src)
+    fixed_dst = self._normalize_path(dst)
+    (status, stdout, stderr) = \
+        self._hadoop_fs_shell(['-cp', fixed_src, fixed_dst])
+    assert status == 0, \
+        '{0} copy failed: '.format(self.filesystem_type) + stderr + "; " + stdout
+    assert self.exists(dst), \
+        '{fs_type} copy failed: Destination file {dst} does not exist'\
+            .format(fs_type=self.filesystem_type, dst=dst)
+
+  def _inner_ls(self, path):
+    """List names, lengths, and mode for files/directories under the specified path."""
+    fixed_path = self._normalize_path(path)
+    (status, stdout, stderr) = self._hadoop_fs_shell(['-ls', fixed_path])
+    # Trim the "Found X items" line and trailing new-line
+    entries = stdout.split("\n")[1:-1]
+    files = []
+    for entry in entries:
+      fields = re.split(" +", entry)
+      files.append({
+        'name': fields[7],
+        'length': int(fields[4]),
+        'mode': fields[0]
+      })
+    return files
+
+  def ls(self, path):
+    """Returns a list of all file and directory names in 'path'"""
+    fixed_path = self._normalize_path(path)
+    files = []
+    for f in self._inner_ls(fixed_path):
+      fname = os.path.basename(f['name'])
+      if not fname == '':
+        files += [fname]
+    return files
+
+  def exists(self, path):
+    """Checks if a particular path exists"""
+    fixed_path = self._normalize_path(path)
+    (status, stdout, stderr) = self._hadoop_fs_shell(['-test', '-e', fixed_path])
+    return status == 0
+
+  def delete_file_dir(self, path, recursive=False):
+    """Delete the file or directory given by the specified path. Recursive must be true
+    for directories."""
+    fixed_path = self._normalize_path(path)
+    rm_command = ['-rm', fixed_path]
+    if recursive:
+      rm_command = ['-rm', '-r', fixed_path]
+    (status, stdout, stderr) = self._hadoop_fs_shell(rm_command)
+    return status == 0
+
+  def get_all_file_sizes(self, path):
+    """Returns a list of integers which are all the file sizes of files found
+    under 'path'."""
+    fixed_path = self._normalize_path(path)
+    return [f['length'] for f in
+        self._inner_ls(fixed_path) if f['mode'][0] == "-"]
+
+  def _normalize_path(self, path):
+    """Paths passed in may lack a leading slash. This adds a leading slash if it is
+    missing."""
+    return path if path.startswith('/') else '/' + path
+
+
 def get_hdfs_client_from_conf(conf):
   """Returns a new HTTP client for an HDFS cluster using an HdfsConfig object"""
   hostport = conf.get('dfs.namenode.http-address')
@@ -148,14 +261,17 @@ def get_hdfs_client_from_conf(conf):
   host, port = hostport.split(":")
   return get_hdfs_client(host=host, port=port)
 
+
 def get_hdfs_client(host, port, user_name=getpass.getuser()):
   """Returns a new HTTP client for an HDFS cluster using an explict host:port pair"""
   return PyWebHdfsClientWithChmod(host=host, port=port, user_name=user_name)
 
+
 def get_default_hdfs_config():
-  core_site_path = join_path(environ.get('HADOOP_CONF_DIR'), 'core-site.xml')
-  hdfs_site_path = join_path(environ.get('HADOOP_CONF_DIR'), 'hdfs-site.xml')
+  core_site_path = os.path.join(environ.get('HADOOP_CONF_DIR'), 'core-site.xml')
+  hdfs_site_path = os.path.join(environ.get('HADOOP_CONF_DIR'), 'hdfs-site.xml')
   return HdfsConfig(core_site_path, hdfs_site_path)
 
+
 def create_default_hdfs_client():
   return get_hdfs_client_from_conf(get_default_hdfs_config())
diff --git a/tests/util/s3_util.py b/tests/util/s3_util.py
deleted file mode 100644
index ec5cfdf..0000000
--- a/tests/util/s3_util.py
+++ /dev/null
@@ -1,103 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-# S3 access utilities
-#
-# This file uses the boto3 client and provides simple functions to the Impala test suite
-# to access Amazon S3.
-
-import boto3
-from tests.util.filesystem_base import BaseFilesystem
-
-class S3Client(BaseFilesystem):
-
-  @classmethod
-  def __init__(self, bucket):
-    self.bucketname = bucket
-    self.s3 = boto3.resource('s3')
-    self.bucket = self.s3.Bucket(self.bucketname)
-    self.s3client = boto3.client('s3')
-
-  def create_file(self, path, file_data, overwrite=True):
-    if not overwrite and self.exists(path): return False
-    self.s3client.put_object(Bucket=self.bucketname, Key=path, Body=file_data)
-    return True
-
-  def make_dir(self, path, permission=None):
-    # This function is a no-op. S3 is a key-value store and does not have a directory
-    # structure. We can use a non existant path as though it already exists.
-    pass
-
-  def copy(self, src, dst):
-    self.s3client.copy_object(Bucket=self.bucketname,
-                              CopySource={'Bucket':self.bucketname, 'Key':src}, Key=dst)
-    assert self.exists(dst), \
-        'S3 copy failed: Destination file {dst} does not exist'.format(dst=dst)
-
-  # Since S3 is a key-value store, it does not have a command like 'ls' for a directory
-  # structured filesystem. It lists everything under a path recursively.
-  # We have to manipulate its response to get an 'ls' like output.
-  def ls(self, path):
-    if not path.endswith('/'):
-      path += '/'
-    # Use '/' as a delimiter so that we don't get all keys under a path recursively.
-    response = self.s3client.list_objects(
-        Bucket=self.bucketname, Prefix=path, Delimiter='/')
-    dirs = []
-    # Non-keys or "directories" will be listed as 'Prefix' under 'CommonPrefixes'.
-    if 'CommonPrefixes' in response:
-      dirs = [t['Prefix'] for t in response['CommonPrefixes']]
-    files = []
-    # Keys or "files" will be listed as 'Key' under 'Contents'.
-    if 'Contents' in response:
-      files = [t['Key'] for t in response['Contents']]
-    files_and_dirs = []
-    files_and_dirs.extend([d.split('/')[-2] for d in dirs])
-    for f in files:
-      key = f.split("/")[-1]
-      if not key == '':
-        files_and_dirs += [key]
-    return files_and_dirs
-
-  def get_all_file_sizes(self, path):
-    if not path.endswith('/'):
-      path += '/'
-    # Use '/' as a delimiter so that we don't get all keys under a path recursively.
-    response = self.s3client.list_objects(
-        Bucket=self.bucketname, Prefix=path, Delimiter='/')
-    if 'Contents' in response:
-      return [t['Size'] for t in response['Contents']]
-    return []
-
-  def exists(self, path):
-    response = self.s3client.list_objects(Bucket=self.bucketname,Prefix=path)
-    return response.get('Contents') is not None
-
-  # Helper function which lists keys in a path. Should not be used by the tests directly.
-  def _list_keys(self, path):
-    if not self.exists(path):
-      return False
-    response = self.s3client.list_objects(Bucket=self.bucketname, Prefix=path)
-    contents = response.get('Contents')
-    return [c['Key'] for c in contents]
-
-  def delete_file_dir(self, path, recursive=False):
-    if not self.exists(path):
-      return True
-    objects = [{'Key': k} for k in self._list_keys(path)] if recursive else path
-    self.s3client.delete_objects(Bucket=self.bucketname, Delete={'Objects':objects})
-    return True