You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/08/06 20:12:53 UTC

[impala] branch master updated (c413f9b -> dbbd403)

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from c413f9b  IMPALA-10047: Revert core piece of IMPALA-6984
     new 87aeb2a  IMPALA-9963: Implement ds_kll_n() function
     new dbbd403  IMPALA-10005: Fix Snappy decompression for non-block filesystems

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exprs/datasketches-common.h                 |   2 +-
 be/src/exprs/datasketches-functions-ir.cc          |  11 ++
 be/src/exprs/datasketches-functions.h              |   5 +
 common/function-registry/impala_functions.py       |   2 +
 .../org/apache/impala/catalog/HdfsCompression.java |  20 +-
 .../queries/QueryTest/datasketches-kll.test        |  37 ++++
 tests/query_test/test_compressed_formats.py        | 202 +++++++++++++--------
 7 files changed, 191 insertions(+), 88 deletions(-)


[impala] 01/02: IMPALA-9963: Implement ds_kll_n() function

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 87aeb2ad78e2106f1d8df84d4d84975c7cde5b5a
Author: Gabor Kaszab <ga...@cloudera.com>
AuthorDate: Thu Jul 30 09:41:00 2020 +0200

    IMPALA-9963: Implement ds_kll_n() function
    
    This function receives a serialized Apache DataSketches KLL sketch
    and returns how many input values were fed into this sketch.
    
    Change-Id: I166e87a468e68e888ac15fca7429ac2552dbb781
    Reviewed-on: http://gerrit.cloudera.org:8080/16259
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exprs/datasketches-common.h                 |  2 +-
 be/src/exprs/datasketches-functions-ir.cc          | 11 +++++++
 be/src/exprs/datasketches-functions.h              |  5 +++
 common/function-registry/impala_functions.py       |  2 ++
 .../queries/QueryTest/datasketches-kll.test        | 37 ++++++++++++++++++++++
 5 files changed, 56 insertions(+), 1 deletion(-)

diff --git a/be/src/exprs/datasketches-common.h b/be/src/exprs/datasketches-common.h
index 7560692..37a6458 100644
--- a/be/src/exprs/datasketches-common.h
+++ b/be/src/exprs/datasketches-common.h
@@ -37,7 +37,7 @@ const int DS_SKETCH_CONFIG = 12;
 /// Logs a common error message saying that sketch deserialization failed.
 void LogSketchDeserializationError(FunctionContext* ctx);
 
-/// Receives a serialized DataSketches sketch  (either Hll or KLL) in
+/// Receives a serialized DataSketches sketch (either Hll or KLL) in
 /// 'serialized_sketch', deserializes it and puts the deserialized sketch into 'sketch'.
 /// The outgoing 'sketch' will hold the same configs as 'serialized_sketch' regardless of
 /// what was provided when it was constructed before this function call. Returns false if
diff --git a/be/src/exprs/datasketches-functions-ir.cc b/be/src/exprs/datasketches-functions-ir.cc
index d2898bc..b76cbe9 100644
--- a/be/src/exprs/datasketches-functions-ir.cc
+++ b/be/src/exprs/datasketches-functions-ir.cc
@@ -59,5 +59,16 @@ FloatVal DataSketchesFunctions::DsKllQuantile(FunctionContext* ctx,
   }
 }
 
+BigIntVal DataSketchesFunctions::DsKllN(FunctionContext* ctx,
+    const StringVal& serialized_sketch) {
+  if (serialized_sketch.is_null || serialized_sketch.len == 0) return BigIntVal::null();
+  datasketches::kll_sketch<float> sketch;
+  if (!DeserializeDsSketch(serialized_sketch, &sketch)) {
+    LogSketchDeserializationError(ctx);
+    return BigIntVal::null();
+  }
+  return sketch.get_n();
+}
+
 }
 
diff --git a/be/src/exprs/datasketches-functions.h b/be/src/exprs/datasketches-functions.h
index 143fd69..bd6b76c 100644
--- a/be/src/exprs/datasketches-functions.h
+++ b/be/src/exprs/datasketches-functions.h
@@ -42,6 +42,11 @@ public:
   /// of [0,1]. Otherwise this function returns error.
   static FloatVal DsKllQuantile(FunctionContext* ctx, const StringVal& serialized_sketch,
       const DoubleVal& rank);
+
+  /// 'serialized_sketch' is expected as a serialized Apache DataSketches KLL sketch. If
+  /// it is not, then the query fails.
+  /// Returns the number of input values fed to 'serialized_sketch'.
+  static BigIntVal DsKllN(FunctionContext* ctx, const StringVal& serialized_sketch);
 };
 
 }
diff --git a/common/function-registry/impala_functions.py b/common/function-registry/impala_functions.py
index 8398785..fbed357 100644
--- a/common/function-registry/impala_functions.py
+++ b/common/function-registry/impala_functions.py
@@ -935,6 +935,8 @@ visible_functions = [
       '_ZN6impala21DataSketchesFunctions13DsHllEstimateEPN10impala_udf15FunctionContextERKNS1_9StringValE'],
   [['ds_kll_quantile'], 'FLOAT', ['STRING', 'DOUBLE'],
       '_ZN6impala21DataSketchesFunctions13DsKllQuantileEPN10impala_udf15FunctionContextERKNS1_9StringValERKNS1_9DoubleValE'],
+  [['ds_kll_n'], 'BIGINT', ['STRING'],
+      '_ZN6impala21DataSketchesFunctions6DsKllNEPN10impala_udf15FunctionContextERKNS1_9StringValE'],
 ]
 
 invisible_functions = [
diff --git a/testdata/workloads/functional-query/queries/QueryTest/datasketches-kll.test b/testdata/workloads/functional-query/queries/QueryTest/datasketches-kll.test
index b7b734b..ee240bf 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/datasketches-kll.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/datasketches-kll.test
@@ -144,3 +144,40 @@ FLOAT,FLOAT,FLOAT,FLOAT,FLOAT,FLOAT
 ---- RESULTS
 100.1999969482422,25000.099609375,50.90000152587891,NULL,50.5,NULL
 ====
+---- QUERY
+# Check that ds_kll_n() returns null for an empty sketch.
+select ds_kll_n(ds_kll_sketch(cast(f2 as float))) from functional_parquet.emptytable;
+---- RESULTS
+NULL
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Check that ds_kll_n() returns null for a null input.
+select ds_kll_n(c) from functional_parquet.nulltable;
+---- RESULTS
+NULL
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Check that ds_kll_n() returns error for strings that are not serialized sketches.
+select ds_kll_n(date_string_col) from functional_parquet.alltypestiny;
+---- CATCH
+UDF ERROR: Unable to deserialize sketch
+====
+---- QUERY
+select ds_kll_n(float_sketch) from sketch_store where year=2009 and month=1;
+---- RESULTS
+25
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Check that ds_kll_n() works on sketches created by Hive.
+select ds_kll_n(f) from kll_sketches_from_hive;
+---- RESULTS
+6
+---- TYPES
+BIGINT
+====


[impala] 02/02: IMPALA-10005: Fix Snappy decompression for non-block filesystems

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit dbbd40308a6d1cef77bfe45e016e775c918e0539
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Thu Jul 23 20:44:30 2020 -0700

    IMPALA-10005: Fix Snappy decompression for non-block filesystems
    
    Snappy-compressed text always uses THdfsCompression::SNAPPY_BLOCKED
    type compression in the backend. However, for non-block filesystems,
    the frontend is incorrectly passing THdfsCompression::SNAPPY instead.
    On debug builds, this leads to a DCHECK when trying to read
    Snappy-compressed text. On release builds, it fails to decompress
    the data.
    
    This fixes the frontend to always pass THdfsCompression::SNAPPY_BLOCKED
    for Snappy-compressed text.
    
    This reworks query_test/test_compressed_formats.py to provide better
    coverage:
     - Changed the RC and Seq test cases to verify that the file extension
       doesn't matter. Added Avro to this case as well.
     - Fixed the text case to use appropriate extensions (fixing IMPALA-9004)
     - Changed the utility function so it doesn't use Hive. This allows it
       to be enabled on non-HDFS filesystems like S3.
     - Changed the test to use unique_database and allow parallel execution.
     - Changed the test to run in the core job, so it now has coverage on
       the usual S3 test configuration. It is reasonably quick (1-2 minutes)
       and runs in parallel.
    
    Testing:
     - Exhaustive job
     - Core s3 job
     - Changed the frontend to force it to use the code for non-block
       filesystems (i.e. the TFileSplitGeneratorSpec code) and
       verified that it is now able to read Snappy-compressed text.
    
    Change-Id: I0879f2fc0bf75bb5c15cecb845ece46a901601ac
    Reviewed-on: http://gerrit.cloudera.org:8080/16278
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Sahil Takiar <st...@cloudera.com>
---
 .../org/apache/impala/catalog/HdfsCompression.java |  20 +-
 tests/query_test/test_compressed_formats.py        | 202 +++++++++++++--------
 2 files changed, 135 insertions(+), 87 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java b/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
index df76463..153106d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
@@ -24,13 +24,15 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 
 /**
- * Support for recognizing compression suffixes on data files.
+ * Support for recognizing compression suffixes on data files. This is currently
+ * limited to text files. Other file formats embed metadata about the compression
+ * type and do not use the file suffixes.
  * Compression of a file is recognized in mapreduce by looking for suffixes of
  * supported codecs.
- * For now Impala supports GZIP, SNAPPY, BZIP2 and some additional formats if plugins
- * are available. Even if a plugin is available, we need to add the file suffixes here so
- * that we can resolve the compression type from the file name. LZO can use the specific
- * HIVE input class.
+ * For now Impala supports GZIP, SNAPPY_BLOCKED, BZIP2 and some additional formats if
+ * plugins are available. Even if a plugin is available, we need to add the file suffixes
+ * here so that we can resolve the compression type from the file name. LZO can use the
+ * specific HIVE input class.
  * Some compression types here are detected even though they are not supported. This
  * allows for better error messages (e.g. LZ4, LZO).
  */
@@ -39,7 +41,7 @@ public enum HdfsCompression {
   DEFLATE,
   GZIP,
   BZIP2,
-  SNAPPY,
+  SNAPPY_BLOCKED,
   LZO,
   LZO_INDEX, //Lzo index file.
   LZ4,
@@ -51,7 +53,7 @@ public enum HdfsCompression {
           put("deflate", DEFLATE).
           put("gz", GZIP).
           put("bz2", BZIP2).
-          put("snappy", SNAPPY).
+          put("snappy", SNAPPY_BLOCKED).
           put("lzo", LZO).
           put("index", LZO_INDEX).
           put("lz4", LZ4).
@@ -76,7 +78,7 @@ public enum HdfsCompression {
     case DEFLATE: return THdfsCompression.DEFLATE;
     case GZIP: return THdfsCompression.GZIP;
     case BZIP2: return THdfsCompression.BZIP2;
-    case SNAPPY: return THdfsCompression.SNAPPY_BLOCKED;
+    case SNAPPY_BLOCKED: return THdfsCompression.SNAPPY_BLOCKED;
     case LZO: return THdfsCompression.LZO;
     case LZ4: return THdfsCompression.LZ4;
     case ZSTD: return THdfsCompression.ZSTD;
@@ -90,7 +92,7 @@ public enum HdfsCompression {
       case DEFLATE: return FbCompression.DEFLATE;
       case GZIP: return FbCompression.GZIP;
       case BZIP2: return FbCompression.BZIP2;
-      case SNAPPY: return FbCompression.SNAPPY;
+      case SNAPPY_BLOCKED: return FbCompression.SNAPPY_BLOCKED;
       case LZO: return FbCompression.LZO;
       case LZ4: return FbCompression.LZ4;
       case ZSTD: return FbCompression.ZSTD;
diff --git a/tests/query_test/test_compressed_formats.py b/tests/query_test/test_compressed_formats.py
index 8aa4705..f295bed 100644
--- a/tests/query_test/test_compressed_formats.py
+++ b/tests/query_test/test_compressed_formats.py
@@ -18,13 +18,12 @@
 import math
 import os
 import pytest
+import random
 import struct
 import subprocess
 from os.path import join
 
-from tests.common.environ import EXTERNAL_WAREHOUSE_DIR
 from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
 from tests.common.test_dimensions import create_single_exec_option_dimension
 from tests.common.test_result_verifier import verify_query_result_is_equal
 from tests.common.test_vector import ImpalaTestDimension
@@ -33,25 +32,19 @@ from tests.util.filesystem_utils import get_fs_path
 
 # (file extension, table suffix) pairs
 compression_formats = [
-  ('.bz2',     'bzip'),
+  ('.bz2', 'bzip'),
   ('.deflate', 'def'),
-  ('.gz',      'gzip'),
-  ('.snappy',  'snap'),
+  ('.gz', 'gzip'),
+  ('.snappy', 'snap'),
  ]
 
+compression_extensions = ['.bz2', '.deflate', '.gz', '.snappy']
 
-# Missing Coverage: Compressed data written by Hive is queriable by Impala on a non-hdfs
-# filesystem.
-@SkipIfS3.hive
-@SkipIfABFS.hive
-@SkipIfADLS.hive
-@SkipIfIsilon.hive
-@SkipIfLocal.hive
-class TestCompressedFormats(ImpalaTestSuite):
+
+class TestCompressedFormatsBase(ImpalaTestSuite):
   """
-  Tests that we support compressed RC, sequence and text files and that unsupported
-  formats fail gracefully (see IMPALA-14: Files with .gz extension reported as 'not
-  supported').
+  Base class to provide utility functions for testing support for compressed
+  data files.
   """
   @classmethod
   def get_workload(self):
@@ -59,64 +52,43 @@ class TestCompressedFormats(ImpalaTestSuite):
 
   @classmethod
   def add_test_dimensions(cls):
-    super(TestCompressedFormats, cls).add_test_dimensions()
-    cls.ImpalaTestMatrix.clear()
-    cls.ImpalaTestMatrix.add_dimension(\
-        ImpalaTestDimension('file_format', *['rc', 'seq', 'text']))
-    cls.ImpalaTestMatrix.add_dimension(\
-        ImpalaTestDimension('compression_format', *compression_formats))
-    if cls.exploration_strategy() == 'core':
-      # Don't run on core.  This test is very slow and we are unlikely
-      # to regress here.
-      cls.ImpalaTestMatrix.add_constraint(lambda v: False);
+    super(TestCompressedFormatsBase, cls).add_test_dimensions()
 
-  @pytest.mark.execute_serially
-  def test_compressed_formats(self, vector):
-    file_format = vector.get_value('file_format')
-    extension, suffix = vector.get_value('compression_format')
-    if file_format in ['rc', 'seq']:
-      # Test that {gzip,snappy,bzip,deflate}-compressed
-      # {RC,sequence,text} files are supported.
-      db_suffix = '_%s_%s' % (file_format, suffix)
-      self._copy_and_query_compressed_file(
-        'tinytable', db_suffix, suffix, '000000_0', extension)
-    elif file_format is 'text':
-        pytest.xfail('IMPALA-9004: TestCompressedFormats is broken for text files')
-    else:
-      assert False, "Unknown file_format: %s" % file_format
-
-  # TODO: switch to using hive metastore API rather than hive shell.
-  def _copy_and_query_compressed_file(self, table_name, db_suffix, compression_codec,
-                                     file_name, extension, expected_error=None):
-    # We want to create a test table with a compressed file that has a file
-    # extension. We'll do this by making a copy of an existing table with hive.
+  def _copy_and_query_compressed_file(self, unique_database, table_name, db_suffix,
+      file_basename, src_extension, dest_extension, expected_error=None):
+    """
+    This is a utility function to test the behavior for compressed file formats
+    with different file extensions. It creates a new table in the unique_database
+    as a copy of the provided table_name from the functional schema with the
+    specified db_suffix. It copies file_basename + src_extension to the new
+    table as file_basename + dest_extension. It then runs a query on the
+    new table. Unless expected_error is set, it expects the query to run successfully.
+    """
+    # Calculate locations for the source table
     base_dir = '/test-warehouse'
-    src_table = 'functional%s.%s' % (db_suffix, table_name)
-    src_table_dir = "%s%s" % (table_name, db_suffix)
-    src_table_dir = join(base_dir, src_table_dir)
-    src_file = join(src_table_dir, file_name)
-
-    # Make sure destination table uses suffix, even if use_suffix=False, so
-    # unique tables are created for each compression format
-    # In Hive3+ create table like behavior is still in discussion, add location
-    # to avoid impact on Impala test.
-    dest_base_dir = '/{0}'.format(EXTERNAL_WAREHOUSE_DIR)
-    dest_table = '%s_%s_copy' % (table_name, compression_codec)
-    dest_table_dir = join(dest_base_dir, dest_table)
-    dest_file = join(dest_table_dir, file_name + extension)
-
-    drop_cmd = 'DROP TABLE IF EXISTS %s;' % (dest_table)
-    hive_cmd = drop_cmd + 'CREATE TABLE %s LIKE %s LOCATION \'%s\';' % \
-      (dest_table, src_table, dest_table_dir)
-
-    # Create the table
-    self.run_stmt_in_hive(hive_cmd)
+    src_table = "functional{0}.{1}".format(db_suffix, table_name)
+    src_table_dir = join(base_dir, table_name + db_suffix)
+    src_file = join(src_table_dir, file_basename + src_extension)
+
+    # Calculate locations for the destination table
+    dest_table_dir = "/test-warehouse/{0}.db/{1}".format(unique_database, table_name)
+    dest_table = "{0}.{1}".format(unique_database, table_name)
+    dest_file = join(dest_table_dir, file_basename + dest_extension)
+
+    # Use a specific location to avoid any interactions with Hive behavior changes.
+    drop_cmd = "DROP TABLE IF EXISTS {0};".format(dest_table)
+    create_cmd = "CREATE TABLE {0} LIKE {1} LOCATION \'{2}\';".format(
+      dest_table, src_table, dest_table_dir)
+
+    # Create the table and copy in the data file
+    self.client.execute(drop_cmd)
+    self.client.execute(create_cmd)
     self.filesystem_client.copy(src_file, dest_file, overwrite=True)
     # Try to read the compressed file with extension
-    query = 'select count(*) from %s' % dest_table
+    query = "select count(*) from {0};".format(dest_table)
     try:
-      # Need to invalidate the metadata because the table was created external to Impala.
-      self.client.execute("invalidate metadata %s" % dest_table)
+      # Need to refresh the metadata to see the file copied in.
+      self.client.execute("refresh {0}".format(dest_table))
       result = self.execute_scalar(query)
       # Fail iff we expected an error
       assert expected_error is None, 'Query is expected to fail'
@@ -125,12 +97,84 @@ class TestCompressedFormats(ImpalaTestSuite):
       error_msg = str(e)
       print error_msg
       if expected_error is None or expected_error not in error_msg:
-        print "Unexpected error:\n%s", error_msg
+        print("Unexpected error:\n{0}".format(error_msg))
         raise
     finally:
-      self.run_stmt_in_hive(drop_cmd)
+      self.client.execute(drop_cmd)
       self.filesystem_client.delete_file_dir(dest_file)
 
+
+class TestCompressedNonText(TestCompressedFormatsBase):
+  """Tests behavior for compressed non-text formats (avro, rc, seq)."""
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestCompressedNonText, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.clear()
+    cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('file_format', *['rc', 'seq', 'avro']))
+    cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('compression_format', *compression_formats))
+
+  def test_insensitivity_to_extension(self, vector, unique_database):
+    """
+    Avro, RC, and Sequence files do not use the file extension to determine the
+    type of compression. This verifies that they work regardless of the
+    extension.
+    """
+    file_format = vector.get_value('file_format')
+    right_extension, suffix = vector.get_value('compression_format')
+    # Avro is only loaded in a subset of the compression types. Bail out for
+    # the ones that are not loaded.
+    if file_format == 'avro' and suffix not in ['def', 'snap']:
+      pytest.xfail('Avro is only created for Deflate and Snappy compression codecs')
+    db_suffix = '_{0}_{1}'.format(file_format, suffix)
+
+    # Pick one wrong extension randomly
+    wrong_extensions = [ext for ext in compression_extensions if ext != right_extension]
+    random.shuffle(wrong_extensions)
+    wrong_extension = wrong_extensions[0]
+    # Test with the "right" extension that matches the file's compression, one wrong
+    # extension, and no extension. By default, Hive does not use a file extension.
+    src_extension = ""
+    for ext in [right_extension, wrong_extension, ""]:
+      self._copy_and_query_compressed_file(
+        unique_database, 'tinytable', db_suffix, '000000_0', src_extension, ext)
+
+
+class TestCompressedText(TestCompressedFormatsBase):
+  """
+  Tests behavior for compressed text files, which determine the compression codec from
+  the file extension.
+  """
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestCompressedText, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.clear()
+    cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('file_format', *['text']))
+    cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('compression_format', *compression_formats))
+
+  def test_correct_extension(self, vector, unique_database):
+    """
+    Text files use the file extension to determine the type of compression.
+    By default, Hive creates files with the appropriate file extension.
+    This verifies the positive case that the correct extension works.
+
+    This is a somewhat trivial test. However, it runs with the core exploration
+    strategy and runs on all filesystems. It verifies formats on core that are
+    otherwise limited to exhaustive. That is important for coverage on non-HDFS
+    filesystems like s3.
+    """
+    file_format = vector.get_value('file_format')
+    extension, suffix = vector.get_value('compression_format')
+    db_suffix = '_{0}_{1}'.format(file_format, suffix)
+    self._copy_and_query_compressed_file(
+      unique_database, 'tinytable', db_suffix, '000000_0', extension, extension)
+
+
 class TestUnsupportedTableWriters(ImpalaTestSuite):
   @classmethod
   def get_workload(cls):
@@ -143,7 +187,7 @@ class TestUnsupportedTableWriters(ImpalaTestSuite):
     # This class tests different formats, but doesn't use constraints.
     # The constraint added below is only to make sure that the test file runs once.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
-        (v.get_value('table_format').file_format =='text' and
+        (v.get_value('table_format').file_format == 'text' and
         v.get_value('table_format').compression_codec == 'none'))
 
   def test_error_message(self, vector, unique_database):
@@ -151,6 +195,7 @@ class TestUnsupportedTableWriters(ImpalaTestSuite):
     # compressed text, avro and sequence.
     self.run_test_case('QueryTest/unsupported-writers', vector, unique_database)
 
+
 @pytest.mark.execute_serially
 class TestLargeCompressedFile(ImpalaTestSuite):
   """
@@ -182,7 +227,7 @@ class TestLargeCompressedFile(ImpalaTestSuite):
     if cls.exploration_strategy() != 'exhaustive':
       pytest.skip("skipping if it's not exhaustive test.")
     cls.ImpalaTestMatrix.add_constraint(lambda v:
-        (v.get_value('table_format').file_format =='text' and
+        (v.get_value('table_format').file_format == 'text' and
         v.get_value('table_format').compression_codec == 'snap'))
 
   def teardown_method(self, method):
@@ -228,24 +273,25 @@ class TestLargeCompressedFile(ImpalaTestSuite):
     hdfs_put.wait()
 
   def test_query_large_file(self, vector):
-    self.__create_test_table();
+    self.__create_test_table()
     dst_path = "%s/%s" % (self.TABLE_LOCATION, self.FILE_NAME)
     file_size = self.MAX_FILE_SIZE
     self.__generate_file(dst_path, file_size)
     self.client.execute("refresh %s" % self.TABLE_NAME)
 
     # Query the table
-    result = self.client.execute("select * from %s limit 1" % self.TABLE_NAME)
+    self.client.execute("select * from %s limit 1" % self.TABLE_NAME)
 
   def __create_test_table(self):
     self.__drop_test_table()
-    self.client.execute("CREATE TABLE %s (col string) " \
+    self.client.execute("CREATE TABLE %s (col string) "
         "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '%s'"
         % (self.TABLE_NAME, self.TABLE_LOCATION))
 
   def __drop_test_table(self):
     self.client.execute("DROP TABLE IF EXISTS %s" % self.TABLE_NAME)
 
+
 class TestBzip2Streaming(ImpalaTestSuite):
   MAX_SCAN_RANGE_LENGTHS = [0, 5]
 
@@ -261,8 +307,8 @@ class TestBzip2Streaming(ImpalaTestSuite):
       pytest.skip("skipping if it's not exhaustive test.")
     cls.ImpalaTestMatrix.add_dimension(
         ImpalaTestDimension('max_scan_range_length', *cls.MAX_SCAN_RANGE_LENGTHS))
-    cls.ImpalaTestMatrix.add_constraint(lambda v:\
-        v.get_value('table_format').file_format == 'text' and\
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format == 'text' and
         v.get_value('table_format').compression_codec == 'bzip')
 
   def test_bzip2_streaming(self, vector):