You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2017/01/12 23:46:52 UTC

[1/4] incubator-impala git commit: IMPALA-4722: Disable log caching in test_scratch_disk

Repository: incubator-impala
Updated Branches:
  refs/heads/master 1a611b393 -> a81ad5eaa


IMPALA-4722: Disable log caching in test_scratch_disk

test_scratch_disk fails sporadically when trying to assert the presence
of log messages. This is probably caused by log caching, since after
such failures the log files do contains the lines in question.

I manually tested this by running the tests repeatedly for 2 days (10k
runs).

To make future diagnosis of similar problems easier, this change also
adds more output to assert_impalad_log_contains().

Change-Id: I9f21284338ee7b4374aca249b6556282b0148389
Reviewed-on: http://gerrit.cloudera.org:8080/5669
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/8b7f8766
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/8b7f8766
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/8b7f8766

Branch: refs/heads/master
Commit: 8b7f876649c34dec7fee1dea34f5acf4db1c037d
Parents: 1a611b3
Author: Lars Volker <lv...@cloudera.com>
Authored: Tue Jan 10 11:57:19 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Jan 12 18:58:48 2017 +0000

----------------------------------------------------------------------
 tests/common/custom_cluster_test_suite.py | 12 ++++++++----
 tests/custom_cluster/test_scratch_disk.py | 12 +++++++-----
 2 files changed, 15 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8b7f8766/tests/common/custom_cluster_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index 4bbef14..365776a 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -127,8 +127,11 @@ class CustomClusterTestSuite(ImpalaTestSuite):
 
   def assert_impalad_log_contains(self, level, line_regex, expected_count=1):
     """
-    Assert that impalad log with specified level (e.g. ERROR, WARNING, INFO)
-    contains expected_count lines with a substring matching the regex.
+    Assert that impalad log with specified level (e.g. ERROR, WARNING, INFO) contains
+    expected_count lines with a substring matching the regex. When using this method to
+    check log files of running processes, the caller should make sure that log buffering
+    has been disabled, for example by adding '-logbuflevel=-1' to the daemon startup
+    options.
     """
     pattern = re.compile(line_regex)
     found = 0
@@ -139,5 +142,6 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       for line in log_file:
         if pattern.search(line):
           found += 1
-    assert found == expected_count, ("Expected %d lines in file %s matching regex '%s'"\
-        + ", but found %d lines") % (expected_count, log_file_path, line_regex, found)
+    assert found == expected_count, ("Expected %d lines in file %s matching regex '%s'"
+        + ", but found %d lines. Last line was: \n%s") % (expected_count, log_file_path,
+                                                          line_regex, found, line)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8b7f8766/tests/custom_cluster/test_scratch_disk.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_scratch_disk.py b/tests/custom_cluster/test_scratch_disk.py
index f523dbe..7e02de5 100644
--- a/tests/custom_cluster/test_scratch_disk.py
+++ b/tests/custom_cluster/test_scratch_disk.py
@@ -83,7 +83,7 @@ class TestScratchDir(CustomClusterTestSuite):
         scratch because all directories are on same disk."""
     normal_dirs = self.generate_dirs(5)
     self._start_impala_cluster([
-      '--impalad_args="-scratch_dirs={0}"'.format(','.join(normal_dirs))])
+      '--impalad_args="-logbuflevel=-1 -scratch_dirs={0}"'.format(','.join(normal_dirs))])
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
                                     expected_count=1)
     exec_option = vector.get_value('exec_option')
@@ -96,7 +96,7 @@ class TestScratchDir(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   def test_no_dirs(self, vector):
     """ Test we can execute a query with no scratch dirs """
-    self._start_impala_cluster(['--impalad_args="-scratch_dirs="'])
+    self._start_impala_cluster(['--impalad_args="-logbuflevel=-1 -scratch_dirs="'])
     self.assert_impalad_log_contains("WARNING",
         "Running without spill to disk: no scratch directories provided\.")
     exec_option = vector.get_value('exec_option')
@@ -113,7 +113,8 @@ class TestScratchDir(CustomClusterTestSuite):
     """ Test we can execute a query with only bad non-writable scratch """
     non_writable_dirs = self.generate_dirs(5, writable=False)
     self._start_impala_cluster([
-      '--impalad_args="-scratch_dirs={0}"'.format(','.join(non_writable_dirs))])
+      '--impalad_args="-logbuflevel=-1 -scratch_dirs={0}"'.format(
+      ','.join(non_writable_dirs))])
     self.assert_impalad_log_contains("ERROR", "Running without spill to disk: could "
         + "not use any scratch directories in list:.*. See previous "
         + "warnings for information on causes.")
@@ -134,7 +135,8 @@ class TestScratchDir(CustomClusterTestSuite):
     """ Test that non-existing directories are not created or used """
     non_existing_dirs = self.generate_dirs(5, non_existing=True)
     self._start_impala_cluster([
-      '--impalad_args="-scratch_dirs={0}"'.format(','.join(non_existing_dirs))])
+      '--impalad_args="-logbuflevel=-1 -scratch_dirs={0}"'.format(
+      ','.join(non_existing_dirs))])
     self.assert_impalad_log_contains("ERROR", "Running without spill to disk: could "
         + "not use any scratch directories in list:.*. See previous "
         + "warnings for information on causes.")
@@ -157,7 +159,7 @@ class TestScratchDir(CustomClusterTestSuite):
         have permissions changed or are removed after impalad startup."""
     dirs = self.generate_dirs(3);
     self._start_impala_cluster([
-      '--impalad_args="-scratch_dirs={0}"'.format(','.join(dirs)),
+      '--impalad_args="-logbuflevel=-1 -scratch_dirs={0}"'.format(','.join(dirs)),
       '--impalad_args=--allow_multiple_scratch_dirs_per_device=true'])
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
                                     expected_count=len(dirs))


[3/4] incubator-impala git commit: IMPALA-4355: random query generator: modify statement execution flow to support DML

Posted by he...@apache.org.
IMPALA-4355: random query generator: modify statement execution flow to support DML

- Rework the discrepancy searcher to run DML statements. We do this by
  using the query profile to choose a table, copy that table, and
  generate a statement that will INSERT into that copy. We chose a slow
  copy over other methods because INSERTing into a copy is a more
  reliable test that prevents table sizes from getting out of hand or
  time-consuming replay to reproduce a particular statement.

- Introduce a statement generator stub. The real generator work is
  tracked in IMPALA-4351 and IMPALA-4353. Here we simply generate a
  basic INSERT INTO ... VALUES statement to make sure our general query
  execution flow is working.

- Add query profile stub for DML statements (INSERT-only at this time).
  Since we'll want INSERT INTO ... SELECT very soon, this inherits from
  DefaultProfile. Also add building blocks for choosing random
  statements in the DefaultProfile.

- Improve the concept of an "execution mode" and add new modes. Before,
  we had "RAW", "CREATE_TABLE_AS", and "CREATE_VIEW_AS". The idea here
  is that some random SELECT queries could be generated as "CREATE
  TABLE|VIEW AS" at execution time, based on weights in the query
  profile. First, we remove the use of raw string literals for this,
  since raw string literals can be error-prone, and introduce a
  StatementExecutionMode class to contain a namespace for the enumerated
  statement execution modes. Second, we introduce a couple new execution
  modes. The first is DML_SETUP: this is a DML statement that needs to
  be run in both the test and reference databases concurrently. For our
  purposes, it's the INSERT ... SELECT that copies data from the chosen
  random table into the table copy. The second is DML_TEST: this is a
  randomly-generated DML statement.

- Switch to using absolute imports in many places. There was a mix of
  absolute and relative imports happening here, and they were causing
  problems, especially when comparing data types. In Python,
  <class 'db_types.Int'> != <class 'tests.comparison.db_types.Int'>.
  Using
    from __future__ import absolute_import
  didn't seem to catch the relative import usage anyway, so I haven't
  employed that.

- Rename some, but not nearly all, names from "query" to "statement".
  Doing this is a rather large undertaking leading to much larger diffs
  and testing (IMPALA-4602).

- Fix a handful of flake8 warnings. There are a bunch that went unfixed
  for over- and under-indentation.

- Testing
  o ./discrepancy_searcher.py runs with and without --explain-only, and
  with --profile default and --profile dmlonly. For tpch_kudu data, it
  seems sufficient to use a --timeout of about 300.
  o Leopard run to make sure standard SELECT-only generation still works
  o Generated random stress queries locally
  o Generated random data locally

Change-Id: Ia4c63a2223185d0e056cc5713796772e5d1b8414
Reviewed-on: http://gerrit.cloudera.org:8080/5387
Reviewed-by: Jim Apple <jb...@apache.org>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/54665120
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/54665120
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/54665120

Branch: refs/heads/master
Commit: 54665120cbf4f242688659941ac50bacc8c221c8
Parents: 5755261
Author: Michael Brown <mi...@cloudera.com>
Authored: Thu Dec 1 09:27:43 2016 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Jan 12 21:40:39 2017 +0000

----------------------------------------------------------------------
 tests/comparison/cli_options.py                 |   6 +-
 tests/comparison/common.py                      |   6 +-
 tests/comparison/data_generator.py              |   8 +-
 .../comparison/data_generator_mapred_common.py  |   4 +-
 tests/comparison/db_connection.py               |   4 +-
 tests/comparison/discrepancy_searcher.py        | 189 +++++++++++++++----
 tests/comparison/funcs.py                       |   4 +-
 tests/comparison/leopard/job.py                 |   3 +-
 tests/comparison/model_translator.py            |   8 +-
 tests/comparison/query.py                       |  57 +++++-
 tests/comparison/query_flattener.py             |   8 +-
 tests/comparison/query_generator.py             |  39 ++--
 tests/comparison/query_profile.py               |  38 +++-
 tests/comparison/random_val_generator.py        |   2 +-
 tests/comparison/statement_generator.py         |  86 +++++++++
 tests/comparison/tests/test_use_nested_with.py  |   6 +-
 tests/stress/concurrent_select.py               |   2 +-
 17 files changed, 370 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54665120/tests/comparison/cli_options.py
----------------------------------------------------------------------
diff --git a/tests/comparison/cli_options.py b/tests/comparison/cli_options.py
index 70740e6..06dacb1 100644
--- a/tests/comparison/cli_options.py
+++ b/tests/comparison/cli_options.py
@@ -23,8 +23,8 @@ import sys
 from getpass import getuser
 from tempfile import gettempdir
 
-import db_connection
-from cluster import (
+from tests.comparison import db_connection
+from tests.comparison.cluster import (
     CmCluster,
     DEFAULT_HIVE_HOST,
     DEFAULT_HIVE_PASSWORD,
@@ -33,7 +33,7 @@ from cluster import (
     MiniCluster,
     MiniHiveCluster,
 )
-from db_types import TYPES
+from tests.comparison.db_types import TYPES
 
 
 def add_logging_options(parser, default_debug_log_file=None):

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54665120/tests/comparison/common.py
----------------------------------------------------------------------
diff --git a/tests/comparison/common.py b/tests/comparison/common.py
index 376243d..0736f5a 100644
--- a/tests/comparison/common.py
+++ b/tests/comparison/common.py
@@ -33,7 +33,7 @@ def get_import(name):
   # noqa below tells flake8 to not warn when it thinks imports are not used
   global __ALREADY_IMPORTED
   if not __ALREADY_IMPORTED:
-    from db_types import (  # noqa
+    from tests.comparison.db_types import (  # noqa
         BigInt,
         Boolean,
         Char,
@@ -44,8 +44,8 @@ def get_import(name):
         JOINABLE_TYPES,
         Number,
         Timestamp)
-    from funcs import AggFunc, AnalyticFunc, Func  # noqa
-    from query import InlineView, Subquery, WithClauseInlineView  # noqa
+    from tests.comparison.funcs import AggFunc, AnalyticFunc, Func  # noqa
+    from tests.comparison.query import InlineView, Subquery, WithClauseInlineView  # noqa
     for key, value in locals().items():
       globals()[key] = value
     __ALREADY_IMPORTED = True

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54665120/tests/comparison/data_generator.py
----------------------------------------------------------------------
diff --git a/tests/comparison/data_generator.py b/tests/comparison/data_generator.py
index e1f0a01..f1b10cd 100755
--- a/tests/comparison/data_generator.py
+++ b/tests/comparison/data_generator.py
@@ -32,13 +32,13 @@ from logging import getLogger
 from random import choice, randint, seed
 from time import time
 
-from data_generator_mapred_common import (
+from tests.comparison.data_generator_mapred_common import (
     estimate_rows_per_reducer,
     MB_PER_REDUCER,
     serialize,
     TextTableDataGenerator)
-from common import Column, Table
-from db_types import (
+from tests.comparison.common import Column, Table
+from tests.comparison.db_types import (
     Char,
     Decimal,
     EXACT_TYPES,
@@ -247,7 +247,7 @@ class DbPopulator(object):
 if __name__ == '__main__':
   from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
 
-  import cli_options
+  from tests.comparison import cli_options
 
   parser = ArgumentParser(
       usage='usage: \n'

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54665120/tests/comparison/data_generator_mapred_common.py
----------------------------------------------------------------------
diff --git a/tests/comparison/data_generator_mapred_common.py b/tests/comparison/data_generator_mapred_common.py
index 42265cb..bcf1620 100644
--- a/tests/comparison/data_generator_mapred_common.py
+++ b/tests/comparison/data_generator_mapred_common.py
@@ -27,8 +27,8 @@ import base64
 import pickle
 import StringIO
 
-from db_types import Decimal
-from random_val_generator import RandomValGenerator
+from tests.comparison.db_types import Decimal
+from tests.comparison.random_val_generator import RandomValGenerator
 
 def serialize(value):
   '''Returns a serialized representation of 'value' suitable for use as a key in an MR

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54665120/tests/comparison/db_connection.py
----------------------------------------------------------------------
diff --git a/tests/comparison/db_connection.py b/tests/comparison/db_connection.py
index c0f39f0..040d30a 100644
--- a/tests/comparison/db_connection.py
+++ b/tests/comparison/db_connection.py
@@ -45,14 +45,14 @@ from textwrap import dedent
 from threading import Lock
 from time import time
 
-from common import (
+from tests.comparison.common import (
     ArrayColumn,
     Column,
     MapColumn,
     StructColumn,
     Table,
     TableExprList)
-from db_types import (
+from tests.comparison.db_types import (
     Char,
     Decimal,
     Double,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54665120/tests/comparison/discrepancy_searcher.py
----------------------------------------------------------------------
diff --git a/tests/comparison/discrepancy_searcher.py b/tests/comparison/discrepancy_searcher.py
index 57e284a..3bd8d23 100755
--- a/tests/comparison/discrepancy_searcher.py
+++ b/tests/comparison/discrepancy_searcher.py
@@ -22,6 +22,8 @@
    results.
 
 '''
+# TODO: IMPALA-4600: refactor this module
+
 from copy import deepcopy
 from decimal import Decimal
 from itertools import izip
@@ -36,21 +38,30 @@ from tempfile import gettempdir
 from threading import current_thread, Thread
 from time import time
 
-from db_types import BigInt
-from db_connection import (
+from tests.comparison.db_types import BigInt
+from tests.comparison.db_connection import (
     DbCursor,
     IMPALA,
     HIVE,
     MYSQL,
     ORACLE,
     POSTGRESQL)
-from model_translator import SqlWriter
-from query_flattener import QueryFlattener
-from query_generator import QueryGenerator
+from tests.comparison.model_translator import SqlWriter
+from tests.comparison.query import (
+    FromClause,
+    InsertClause,
+    InsertStatement,
+    Query,
+    StatementExecutionMode,
+    SelectClause,
+    SelectItem)
+from tests.comparison.query_flattener import QueryFlattener
+from tests.comparison.statement_generator import get_generator
 from tests.comparison import db_connection
 
 LOG = getLogger(__name__)
 
+
 class QueryResultComparator(object):
   '''Used for comparing the results of a Query across two databases'''
 
@@ -151,8 +162,8 @@ class QueryResultComparator(object):
 
     comparison_result.ref_row_count = len(ref_data_set)
     comparison_result.test_row_count = len(test_data_set)
-    comparison_result.query_resulted_in_data = (comparison_result.test_row_count > 0
-        or comparison_result.ref_row_count > 0)
+    comparison_result.query_resulted_in_data = (comparison_result.test_row_count > 0 or
+                                                comparison_result.ref_row_count > 0)
     if comparison_result.ref_row_count != comparison_result.test_row_count:
       return comparison_result
 
@@ -192,6 +203,10 @@ class QueryResultComparator(object):
         return comparison_result
     comparison_result.query_resulted_in_data = found_data
 
+    # If we're here, it means ref and test data sets are equal for a DML statement.
+    if isinstance(query, (InsertStatement,)):
+      comparison_result.modified_rows_count = test_data_set[0][0]
+
     return comparison_result
 
   def standardize_data(self, data, ref_col_description, test_col_description):
@@ -279,7 +294,7 @@ class QueryExecutor(object):
       try:
         unlink(link)
       except OSError as e:
-        if not 'No such file' in str(e):
+        if 'No such file' not in str(e):
           raise e
       try:
         symlink(query_log_path, link)
@@ -347,18 +362,22 @@ class QueryExecutor(object):
 
        "query" should be an instance of query.Query.
     '''
-    if query.execution != 'RAW':
+    if query.execution in (StatementExecutionMode.CREATE_TABLE_AS,
+                           StatementExecutionMode.CREATE_VIEW_AS):
       self._table_or_view_name = self._create_random_table_name()
+    elif isinstance(query, (InsertStatement,)):
+      self._table_or_view_name = query.dml_table.name
 
     query_threads = list()
-    for sql_writer, cursor, log_file \
-        in izip(self.sql_writers, self.cursors, self.query_logs):
+    for sql_writer, cursor, log_file in izip(
+        self.sql_writers, self.cursors, self.query_logs
+    ):
       if self.ENABLE_RANDOM_QUERY_OPTIONS and cursor.db_type == IMPALA:
         self.set_impala_query_options(cursor)
       query_thread = Thread(
           target=self._fetch_sql_results,
           args=[query, cursor, sql_writer, log_file],
-          name='Query execution thread {0}'.format(current_thread().name))
+          name='Statement execution thread {0}'.format(current_thread().name))
       query_thread.daemon = True
       query_thread.sql = ''
       query_thread.data_set = None
@@ -389,10 +408,14 @@ class QueryExecutor(object):
         query_thread.exception = QueryTimeout(
             'Query timed out after %s seconds' % self.query_timeout_seconds)
 
-    return [(query_thread.sql,
-        query_thread.exception,
-        query_thread.data_set,
-        query_thread.cursor_description) for query_thread in query_threads]
+      if (query.execution in (StatementExecutionMode.CREATE_TABLE_AS,
+                              StatementExecutionMode.DML_TEST)):
+        cursor.drop_table(self._table_or_view_name)
+      elif query.execution == StatementExecutionMode.CREATE_VIEW_AS:
+        cursor.drop_view(self._table_or_view_name)
+
+    return [(query_thread.sql, query_thread.exception, query_thread.data_set,
+             query_thread.cursor_description) for query_thread in query_threads]
 
   def _fetch_sql_results(self, query, cursor, sql_writer, log_file):
     '''Execute the query using the cursor and set the result or exception on the local
@@ -405,12 +428,17 @@ class QueryExecutor(object):
         # testing of Impala nested types support.
         query = deepcopy(query)
         QueryFlattener().flatten(query)
-      if query.execution == 'CREATE_TABLE_AS':
+      if query.execution == StatementExecutionMode.CREATE_TABLE_AS:
         setup_sql = sql_writer.write_create_table_as(query, self._table_or_view_name)
         query_sql = 'SELECT * FROM ' + self._table_or_view_name
-      elif query.execution == 'VIEW':
+      elif query.execution == StatementExecutionMode.CREATE_VIEW_AS:
         setup_sql = sql_writer.write_create_view(query, self._table_or_view_name)
         query_sql = 'SELECT * FROM ' + self._table_or_view_name
+      elif isinstance(query, (InsertStatement,)):
+        setup_sql = sql_writer.write_query(query)
+        # TODO: improve validation (IMPALA-4599). This is good enough for looking for
+        # crashes on DML statements
+        query_sql = 'SELECT COUNT(*) FROM ' + self._table_or_view_name
       else:
         setup_sql = None
         query_sql = sql_writer.write_query(query)
@@ -444,13 +472,11 @@ class QueryExecutor(object):
           break
         if len(data_set) > row_limit:
           raise DataLimitExceeded('Too much data')
+      if isinstance(query, (InsertStatement,)):
+        LOG.debug('Total row count for {0}: {1}'.format(
+          cursor.db_type, str(data_set)))
     except Exception as e:
       current_thread().exception = e
-    finally:
-      if query.execution == 'CREATE_TABLE_AS':
-        cursor.drop_table(self._table_or_view_name)
-      elif query.execution == 'VIEW':
-        cursor.drop_view(self._table_or_view_name)
 
   def _create_random_table_name(self):
     char_choices = ascii_lowercase
@@ -479,6 +505,7 @@ class ComparisonResult(object):
     self.ref_row = None   # The test row where mismatch happened
     self.test_row = None   # The reference row where mismatch happened
     self.exception = None
+    self.modified_rows_count = None
     self._error_message = None
 
   @property
@@ -526,6 +553,7 @@ QueryTimeout = type('QueryTimeout', (Exception, ), {})
 TypeOverflow = type('TypeOverflow', (Exception, ), {})
 DataLimitExceeded = type('DataLimitExceeded', (Exception, ), {})
 
+
 class KnownError(Exception):
 
   def __init__(self, jira_url):
@@ -549,7 +577,6 @@ class FrontendExceptionSearcher(object):
           raise Exception("Unable to find a common set of tables in both databases")
 
   def search(self, number_of_test_queries):
-    query_generator = QueryGenerator(self.query_profile)
 
     def on_ref_db_error(e, sql):
       LOG.warn("Error generating explain plan for reference db:\n%s\n%s" % (e, sql))
@@ -560,7 +587,14 @@ class FrontendExceptionSearcher(object):
 
     for idx in xrange(number_of_test_queries):
       LOG.info("Explaining query #%s" % (idx + 1))
-      query = query_generator.create_query(self.common_tables)
+      statement_type = self.query_profile.choose_statement()
+      statement_generator = get_generator(statement_type)(self.query_profile)
+      if issubclass(statement_type, (InsertStatement,)):
+        dml_table = self.query_profile.choose_table(self.common_tables)
+      else:
+        dml_table = None
+      query = statement_generator.generate_statement(
+          self.common_tables, dml_table=dml_table)
       if not self._explain_query(self.ref_conn, self.ref_sql_writer, query,
           on_ref_db_error):
         continue
@@ -587,6 +621,8 @@ class QueryResultDiffSearcher(object):
   # Sometimes things get into a bad state and the same error loops forever
   ABORT_ON_REPEAT_ERROR_COUNT = 2
 
+  COPY_TABLE_SUFFIX = '__qgen_copy'
+
   def __init__(self, query_profile, ref_conn, test_conn):
     '''query_profile should be an instance of one of the profiles in query_profile.py'''
     self.query_profile = query_profile
@@ -598,6 +634,46 @@ class QueryResultDiffSearcher(object):
         if not self.common_tables:
           raise Exception("Unable to find a common set of tables in both databases")
 
+  def _concurrently_copy_table(self, src_table):
+    """
+    Given a Table object, create another Table with the same schema and return the new
+    Table object.  The schema will be created in both the test and reference databases.
+
+    The data is then copied in both the ref and test databases using threads.
+    """
+    with test_conn.cursor() as test_cursor:
+      test_cursor.execute('SHOW CREATE TABLE {0}'.format(src_table.name))
+      (create_table_sql,) = test_cursor.fetchall()[0]
+      new_table_name = src_table.name + self.COPY_TABLE_SUFFIX
+      create_table_sql = create_table_sql.replace(src_table.name, new_table_name, 1)
+      test_cursor.drop_table(new_table_name)
+      test_cursor.execute(create_table_sql)
+      new_table = test_cursor.describe_table(new_table_name)
+    with ref_conn.cursor() as ref_cursor:
+      ref_cursor.drop_table(new_table_name)
+      ref_cursor.create_table(new_table)
+
+    copy_select_query = Query()
+    copy_select_query.select_clause = SelectClause(
+        [SelectItem(col) for col in src_table.cols])
+    copy_select_query.from_clause = FromClause(src_table)
+
+    if new_table.primary_keys:
+      conflict_action = InsertStatement.CONFLICT_ACTION_IGNORE
+    else:
+      conflict_action = InsertStatement.CONFLICT_ACTION_DEFAULT
+
+    table_copy_statement = InsertStatement(
+        insert_clause=InsertClause(new_table), select_query=copy_select_query,
+        conflict_action=conflict_action, execution=StatementExecutionMode.DML_SETUP)
+
+    result = self.query_result_comparator.compare_query_results(table_copy_statement)
+    if result.error:
+      raise Exception('setup SQL to copy table failed: {0}'.format(result.error))
+    self._dml_table_size = result.modified_rows_count
+
+    return new_table
+
   def search(self, number_of_test_queries, stop_on_result_mismatch, stop_on_crash,
              query_timeout_seconds):
     '''Returns an instance of SearchResults, which is a summary report. This method
@@ -607,9 +683,8 @@ class QueryResultDiffSearcher(object):
       to generate and execute.
     '''
     start_time = time()
-    query_result_comparator = QueryResultComparator(
+    self.query_result_comparator = QueryResultComparator(
         self.query_profile, self.ref_conn, self.test_conn, query_timeout_seconds)
-    query_generator = QueryGenerator(self.query_profile)
     query_count = 0
     queries_resulted_in_data_count = 0
     mismatch_count = 0
@@ -618,14 +693,40 @@ class QueryResultDiffSearcher(object):
     test_crash_count = 0
     last_error = None
     repeat_error_count = 0
+    count_effective_dml_statements = 0
+    count_rows_affected_by_dml = 0
+
     while number_of_test_queries > query_count:
-      query = query_generator.create_query(self.common_tables)
-      query.execution = self.query_profile.get_query_execution()
+      statement_type = self.query_profile.choose_statement()
+      statement_generator = get_generator(statement_type)(self.query_profile)
+      dml_table = None
+      if issubclass(statement_type, (InsertStatement,)):
+        dml_choice_src_table = self.query_profile.choose_table(self.common_tables)
+        # Copy the table we want to INSERT INTO. Do this for the following reasons:
+        #
+        # 1. If we don't copy, the tables will get larger and larger
+        # 2. If we want to avoid tables getting larger and larger, we have to come up
+        # with some threshold about when to cut and start over.
+        # 3. If we keep INSERTing into tables and finally find a crash, we have to
+        # replay all previous INSERTs again. Those INSERTs may not produce the same rows
+        # as before. To maximize the chance of bug reproduction, run every INSERT on a
+        # pristine table.
+        dml_table = self._concurrently_copy_table(dml_choice_src_table)
+      statement = statement_generator.generate_statement(
+          self.common_tables, dml_table=dml_table)
+      if isinstance(statement, Query):
+        # we can re-write statement execution here to possibly be a CREATE TABLE AS SELECT
+        # or CREATE VIEW AS SELECT
+        statement.execution = self.query_profile.get_query_execution()
       query_count += 1
       LOG.info('Running query #%s', query_count)
-      result = query_result_comparator.compare_query_results(query)
+      result = self.query_result_comparator.compare_query_results(statement)
       if result.query_resulted_in_data:
         queries_resulted_in_data_count += 1
+      if result.modified_rows_count:
+        count_effective_dml_statements += 1
+        count_rows_affected_by_dml += abs(
+            result.modified_rows_count - self._dml_table_size)
       if isinstance(result.exception, DataLimitExceeded) \
           or isinstance(result.exception, TypeOverflow):
         continue
@@ -671,13 +772,13 @@ class QueryResultDiffSearcher(object):
           ]
           call(impala_restart_cmd)
           self.test_conn.reconnect()
-          query_result_comparator.test_cursor = self.test_conn.cursor()
-          result = query_result_comparator.compare_query_results(query)
+          self.query_result_comparator.test_cursor = self.test_conn.cursor()
+          result = self.query_result_comparator.compare_query_results(statement)
           if result.error:
             LOG.info('Restarting Impala')
             call(impala_restart_cmd)
             self.test_conn.reconnect()
-            query_result_comparator.test_cursor = self.test_conn.cursor()
+            self.query_result_comparator.test_cursor = self.test_conn.cursor()
           else:
             break
 
@@ -708,7 +809,9 @@ class QueryResultDiffSearcher(object):
         query_timeout_count,
         known_error_count,
         test_crash_count,
-        time() - start_time)
+        time() - start_time,
+        count_effective_dml_statements,
+        count_rows_affected_by_dml)
 
 
 class SearchResults(object):
@@ -721,7 +824,10 @@ class SearchResults(object):
       query_timeout_count,
       known_error_count,
       test_crash_count,
-      run_time_in_seconds):
+      run_time_in_seconds,
+      count_effective_dml_statements,
+      count_rows_affected_by_dml
+    ):
     # Approx number of queries run, some queries may have been ignored
     self.query_count = query_count
     self.queries_resulted_in_data_count = queries_resulted_in_data_count
@@ -731,6 +837,10 @@ class SearchResults(object):
     self.known_error_count = known_error_count
     self.test_crash_count = test_crash_count
     self.run_time_in_seconds = run_time_in_seconds
+    # number of DML statements that actually modified tables
+    self.count_effective_dml_statements = count_effective_dml_statements
+    # total number of rows modified by DML statemnts
+    self.count_rows_affected_by_dml = count_rows_affected_by_dml
 
   def __str__(self):
     '''Returns the string representation of the results.'''
@@ -752,6 +862,8 @@ class SearchResults(object):
         '%(run_time)s.\n'
         '%(queries_resulted_in_data_count)s of %(query_count)s queries produced results.'
         '\n'
+        '%(count_effective_dml_statements)s of %(query_count)s statements modified a '
+        'total of %(count_rows_affected_by_dml)s rows\n'
         '%(test_crash_count)s crashes occurred.\n'
         '%(known_error_count)s queries were excluded from the mismatch count because '
         'they are known errors.\n'
@@ -763,8 +875,8 @@ if __name__ == '__main__':
   import sys
   from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
 
-  import cli_options
-  from query_profile import PROFILES
+  from tests.comparison import cli_options
+  from tests.comparison.query_profile import PROFILES
 
   parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
   cli_options.add_logging_options(parser)
@@ -822,6 +934,7 @@ if __name__ == '__main__':
     diff_searcher = QueryResultDiffSearcher(query_profile, ref_conn, test_conn)
     query_timeout_seconds = args.timeout
     search_results = diff_searcher.search(
-        args.query_count, args.stop_on_mismatch, args.stop_on_crash, query_timeout_seconds)
+        args.query_count, args.stop_on_mismatch, args.stop_on_crash,
+        query_timeout_seconds)
     print(search_results)
     sys.exit(search_results.mismatch_count)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54665120/tests/comparison/funcs.py
----------------------------------------------------------------------
diff --git a/tests/comparison/funcs.py b/tests/comparison/funcs.py
index bebe305..b6d7b92 100644
--- a/tests/comparison/funcs.py
+++ b/tests/comparison/funcs.py
@@ -18,8 +18,8 @@
 from copy import deepcopy
 from itertools import ifilter
 
-from common import ValExpr
-from db_types import (
+from tests.comparison.common import ValExpr
+from tests.comparison.db_types import (
     Boolean,
     Char,
     DataType,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54665120/tests/comparison/leopard/job.py
----------------------------------------------------------------------
diff --git a/tests/comparison/leopard/job.py b/tests/comparison/leopard/job.py
index a6fd209..b526d4c 100755
--- a/tests/comparison/leopard/job.py
+++ b/tests/comparison/leopard/job.py
@@ -163,8 +163,9 @@ class Job(object):
       while num_unexpected_errors < NUM_UNEXPECTED_ERRORS_THRESHOLD:
         query = None
         try:
+          # TODO: Support DML statements. Possibly this be part of IMPALA-4600.
           self.query_generator = QueryGenerator(self.query_profile)
-          query = self.query_generator.create_query(self.common_tables)
+          query = self.query_generator.generate_statement(self.common_tables)
         except IndexError as e:
           # This is a query generator bug that happens extremely rarely
           LOG.info('Query Generator Choice Problem, {0}'.format(e))

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54665120/tests/comparison/model_translator.py
----------------------------------------------------------------------
diff --git a/tests/comparison/model_translator.py b/tests/comparison/model_translator.py
index 61b041e..14e70b7 100644
--- a/tests/comparison/model_translator.py
+++ b/tests/comparison/model_translator.py
@@ -20,8 +20,8 @@ from logging import getLogger
 from re import sub
 from sqlparse import format
 
-from common import StructColumn, CollectionColumn
-from db_types import (
+from tests.comparison.common import StructColumn, CollectionColumn
+from tests.comparison.db_types import (
     Char,
     Decimal,
     Float,
@@ -29,8 +29,8 @@ from db_types import (
     String,
     Timestamp,
     VarChar)
-from query import InsertStatement, Query
-from query_flattener import QueryFlattener
+from tests.comparison.query import InsertStatement, Query
+from tests.comparison.query_flattener import QueryFlattener
 
 LOG = getLogger(__name__)
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54665120/tests/comparison/query.py
----------------------------------------------------------------------
diff --git a/tests/comparison/query.py b/tests/comparison/query.py
index 8821a85..022dd9c 100644
--- a/tests/comparison/query.py
+++ b/tests/comparison/query.py
@@ -19,11 +19,32 @@ from abc import ABCMeta, abstractproperty
 from copy import deepcopy
 from logging import getLogger
 
-from common import Column, TableExpr, TableExprList, ValExpr, ValExprList
+from tests.comparison.common import Column, TableExpr, TableExprList, ValExpr, ValExprList
+
 
 LOG = getLogger(__name__)
 
 
+class StatementExecutionMode(object):
+  """
+  Provide a name space for statement execution modes.
+  """
+  (
+      # A SELECT statement is executed and results are compared.
+      SELECT_STATEMENT,
+      # If this is chosen, statement execution will run the CTAS statement and then
+      # SELECT * on the table for comparision. The table is torn down after.
+      CREATE_TABLE_AS,
+      # Same as above, except with a few.
+      CREATE_VIEW_AS,
+      # a DML operation that isn't actually a test, but some setup operation that needs
+      # to be run concurrently
+      DML_SETUP,
+      # a DML statement that's actually a test
+      DML_TEST,
+  ) = xrange(5)
+
+
 class AbstractStatement(object):
   """
   Abstract query representation
@@ -38,10 +59,7 @@ class AbstractStatement(object):
     self.parent = None
     # optional WITH clause some statements may have
     self.with_clause = None
-    # Used by QueryExecutor to track whether this query is "raw", a CREATE TABLE AS
-    # SELECT, or CREATE VIEW AS SELECT
-    # TODO: Instead of plain strings, these values should be from some enumerated type.
-    self.execution = 'RAW'
+    self._execution = None
 
   @abstractproperty
   def table_exprs(self):
@@ -66,6 +84,19 @@ class AbstractStatement(object):
     """
     pass
 
+  @property
+  def execution(self):
+    """
+    one of the possible StatementExecutionMode values (see class definition for meaning)
+    """
+    if self._execution is None:
+      raise Exception('execution is not set on this object')
+    return self._execution
+
+  @execution.setter
+  def execution(self, val):
+    self._execution = val
+
 
 class Query(AbstractStatement):
   # TODO: This has to be called Query for as long as we want to unpickle old reports, or
@@ -89,6 +120,10 @@ class Query(AbstractStatement):
     self.union_clause = None
     self.order_by_clause = None
     self.limit_clause = None
+    # This is a fine default value, because any well-formed object will be a SELECT
+    # statement. Only the discrepancy searcher makes the decision at run time to change
+    # this.
+    self.execution = StatementExecutionMode.SELECT_STATEMENT
 
   def __deepcopy__(self, memo):
     other = Query()
@@ -705,7 +740,8 @@ class InsertStatement(AbstractStatement):
    CONFLICT_ACTION_IGNORE) = range(2)
 
   def __init__(self, with_clause=None, insert_clause=None, select_query=None,
-               values_clause=None, conflict_action=CONFLICT_ACTION_DEFAULT):
+               values_clause=None, conflict_action=CONFLICT_ACTION_DEFAULT,
+               execution=None):
     """
     Represent an INSERT statement. The INSERT may have an optional WithClause, and then
     either a SELECT query (Query) object from whose rows we INSERT, or a VALUES clause,
@@ -718,11 +754,14 @@ class InsertStatement(AbstractStatement):
     CONFLICT DO NOTHING". The syntax doesn't change for Impala, but the implied
     semantics are needed: if we are INSERTing a Kudu table, conflict_action must be
     CONFLICT_ACTION_IGNORE.
+
+    The execution attribute is used by the discrepancy_searcher to track whether this
+    InsertStatement is some sort of setup operation or a true random statement test.
     """
     super(InsertStatement, self).__init__()
     self._select_query = None
     self._values_clause = None
-
+    self.execution = execution
     self.select_query = select_query
     self.values_clause = values_clause
     self.with_clause = with_clause
@@ -772,3 +811,7 @@ class InsertStatement(AbstractStatement):
       queries.append(self.select_query)
       queries.extend(self.select_query.nested_queries)
     return queries
+
+  @property
+  def dml_table(self):
+    return self.insert_clause.table

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54665120/tests/comparison/query_flattener.py
----------------------------------------------------------------------
diff --git a/tests/comparison/query_flattener.py b/tests/comparison/query_flattener.py
index 9fc7e44..2a11913 100644
--- a/tests/comparison/query_flattener.py
+++ b/tests/comparison/query_flattener.py
@@ -18,14 +18,14 @@
 from copy import deepcopy
 from logging import getLogger
 
-from common import (
+from tests.comparison.common import (
     CollectionColumn,
     Column,
     StructColumn,
     Table)
-from db_types import BigInt, Boolean
-from funcs import Equals, And
-from query import (
+from tests.comparison.db_types import BigInt, Boolean
+from tests.comparison.funcs import Equals, And
+from tests.comparison.query import (
     FromClause,
     InlineView,
     JoinClause,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54665120/tests/comparison/query_generator.py
----------------------------------------------------------------------
diff --git a/tests/comparison/query_generator.py b/tests/comparison/query_generator.py
index 3a923e0..dd30287 100644
--- a/tests/comparison/query_generator.py
+++ b/tests/comparison/query_generator.py
@@ -21,9 +21,9 @@ from itertools import ifilter
 from logging import getLogger
 from random import shuffle, choice, randint, randrange
 
-from common import TableExprList, ValExpr, ValExprList, Table, Column
-from query_profile import DefaultProfile
-from funcs import (
+from tests.comparison.common import TableExprList, ValExpr, ValExprList, Table, Column
+from tests.comparison.query_profile import DefaultProfile
+from tests.comparison.funcs import (
     AGG_FUNCS,
     AggFunc,
     ANALYTIC_FUNCS,
@@ -36,12 +36,12 @@ from funcs import (
     PartitionByClause,
     WindowBoundary,
     WindowClause)
-from db_types import (
+from tests.comparison.db_types import (
     Boolean,
     Int,
     Float,
     TYPES)
-from query import (
+from tests.comparison.query import (
     FromClause,
     GroupByClause,
     HavingClause,
@@ -66,6 +66,7 @@ UNBOUNDED_FOLLOWING = WindowBoundary.UNBOUNDED_FOLLOWING
 
 LOG = getLogger(__name__)
 
+
 class QueryGenerator(object):
 
   def __init__(self, query_profile):
@@ -94,7 +95,7 @@ class QueryGenerator(object):
     self.profile.query = None
     self.max_nested_query_count = None
 
-  def create_query(self,
+  def generate_statement(self,
       table_exprs,
       allow_with_clause=True,
       allow_union_clause=True,
@@ -102,6 +103,7 @@ class QueryGenerator(object):
       required_select_item_type=None,
       required_table_expr_col_type=None,
       require_aggregate=None,
+      dml_table=None,
       table_alias_prefix='t'):
     '''Create a random query using various language features.
 
@@ -134,6 +136,9 @@ class QueryGenerator(object):
        require_aggregate can be set to True or False to force or disable the creation
        of an aggregate query. This is used during Subquery creation where the context
        may require an aggregate or non-aggregate.
+
+       dml_table exists for kwargs compatibility with other statement generators but is
+       ignored
     '''
     if not table_exprs:
       raise Exception("At least one TableExpr is needed")
@@ -220,7 +225,7 @@ class QueryGenerator(object):
       for select_item in select_clause.items:
         select_item_data_types.append(
             choice(data_type_candidates_by_base_type[select_item.val_expr.base_type]))
-      query.union_clause = UnionClause(self.create_query(
+      query.union_clause = UnionClause(self.generate_statement(
           table_exprs,
           allow_with_clause=False,
           select_item_data_types=select_item_data_types))
@@ -249,8 +254,8 @@ class QueryGenerator(object):
     with_clause_inline_views = TableExprList()
     for with_clause_inline_view_idx \
         in xrange(self.profile.get_with_clause_table_ref_count()):
-      query = self.create_query(table_exprs,
-                                allow_with_clause=self.profile.use_nested_with())
+      query = self.generate_statement(table_exprs,
+                                      allow_with_clause=self.profile.use_nested_with())
       with_clause_alias_count = getattr(self.root_query, 'with_clause_alias_count', 0) + 1
       self.root_query.with_clause_alias_count = with_clause_alias_count
       with_clause_inline_view = \
@@ -504,7 +509,7 @@ class QueryGenerator(object):
             join_expr_type = None
           select_item_data_types = \
               [signature_arg.type] if use_scalar_subquery else signature_arg.type
-          query = self.create_query(
+          query = self.generate_statement(
               table_exprs,
               select_item_data_types=select_item_data_types,
               required_table_expr_col_type=join_expr_type,
@@ -568,7 +573,7 @@ class QueryGenerator(object):
         if not signature_arg.can_be_null and not arg.is_constant:
           val = self.profile.choose_constant(return_type=arg.type, allow_null=False)
           func.args[idx] = Coalesce.create_from_args(arg, val)
-        if not arg.is_constant or not arg.val is None:
+        if not arg.is_constant or arg.val is not None:
           has_non_null_literal_arg = True
     return func
 
@@ -669,7 +674,7 @@ class QueryGenerator(object):
     while max_children:
       null_arg_pool = None
       parent_func, parent_arg_idx = None, None
-      chosen_signature= None
+      chosen_signature=None
 
       # Since aggregate (and let's assume analytic functions) return a limited set of
       # types, some prep work may be needed if the return type isn't in the set of
@@ -1144,9 +1149,9 @@ class QueryGenerator(object):
     return deepcopy(self.profile.choose_table(table_exprs))
 
   def _create_inline_view(self, table_exprs, required_type=None):
-    return InlineView(self.create_query(
-      table_exprs, required_select_item_type=required_type,
-      allow_with_clause=self.profile.use_nested_with()))
+    return InlineView(self.generate_statement(
+        table_exprs, required_select_item_type=required_type,
+        allow_with_clause=self.profile.use_nested_with()))
 
   def _create_join_clause(self, from_clause, table_exprs):
     join_type = self.profile.choose_join_type(JoinClause.JOINS_TYPES)
@@ -1352,7 +1357,7 @@ class QueryGenerator(object):
     #
     # then choosing a random val (which is a list of aggs) in the above dict, and
     # finally randomly adding DISTINCT to items in the list.
-    exprs_to_funcs =  defaultdict(list)
+    exprs_to_funcs = defaultdict(list)
     for item in agg_items:
       for expr, funcs in self._group_agg_funcs_by_expr(item.val_expr).iteritems():
         exprs_to_funcs[expr].extend(funcs)
@@ -1548,7 +1553,7 @@ def generate_queries_for_manual_inspection():
   ref_writer = SqlWriter.create(dialect='POSTGRESQL',
       nulls_order_asc=query_profile.nulls_order_asc())
   for _ in xrange(NUM_QUERIES):
-    query = query_generator.create_query(tables)
+    query = query_generator.generate_statement(tables)
     print("Test db")
     print(sql_writer.write_query(query) + '\n')
     print("Ref db")

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54665120/tests/comparison/query_profile.py
----------------------------------------------------------------------
diff --git a/tests/comparison/query_profile.py b/tests/comparison/query_profile.py
index f58fa84..b1e4ab4 100644
--- a/tests/comparison/query_profile.py
+++ b/tests/comparison/query_profile.py
@@ -18,7 +18,7 @@
 from logging import getLogger
 from random import choice, randint, random
 
-from db_types import (
+from tests.comparison.db_types import (
     Boolean,
     Char,
     Decimal,
@@ -26,7 +26,8 @@ from db_types import (
     Int,
     TYPES,
     Timestamp)
-from funcs import (
+from tests.comparison.query import InsertStatement, Query, StatementExecutionMode
+from tests.comparison.funcs import (
     AnalyticAvg,
     AnalyticCount,
     AnalyticFirstValue,
@@ -52,7 +53,7 @@ from funcs import (
     NotIn,
     Or,
     WindowBoundary)
-from random_val_generator import RandomValGenerator
+from tests.comparison.random_val_generator import RandomValGenerator
 
 UNBOUNDED_PRECEDING = WindowBoundary.UNBOUNDED_PRECEDING
 PRECEDING = WindowBoundary.PRECEDING
@@ -62,6 +63,7 @@ UNBOUNDED_FOLLOWING = WindowBoundary.UNBOUNDED_FOLLOWING
 
 LOG = getLogger()
 
+
 class DefaultProfile(object):
 
   def __init__(self):
@@ -190,9 +192,12 @@ class DefaultProfile(object):
             ('Scalar', 'NON_AGG', 'CORRELATED'): 0,   # Not supported
             ('Scalar', 'NON_AGG', 'UNCORRELATED'): 1},
         'QUERY_EXECUTION': {   # Used by the discrepancy searcher
-            'CREATE_TABLE_AS': 1,
-            'RAW': 10,
-            'VIEW': 1}}
+            StatementExecutionMode.CREATE_TABLE_AS: 1,
+            StatementExecutionMode.CREATE_VIEW_AS: 1,
+            StatementExecutionMode.SELECT_STATEMENT: 10},
+        'STATEMENT': {
+            # TODO: Eventually make this a mix of DML and SELECT (IMPALA-4601)
+            Query: 1}}
 
     # On/off switches
     self._flags = {
@@ -478,7 +483,8 @@ class DefaultProfile(object):
     missing_funcs = set(s.func for s in filtered_signatures) - set(func_weights)
     if missing_funcs:
       raise Exception("Weights are missing for functions: {0}".format(missing_funcs))
-    return self.choose_func_signature(filtered_signatures, self.weights('RELATIONAL_FUNCS'))
+    return self.choose_func_signature(filtered_signatures,
+                                      self.weights('RELATIONAL_FUNCS'))
 
   def choose_func_signature(self, signatures, _func_weights=None):
     '''Return a signature chosen from "signatures".'''
@@ -606,6 +612,9 @@ class DefaultProfile(object):
     """
     return None
 
+  def choose_statement(self):
+    return self._choose_from_weights('STATEMENT')
+
 
 class ImpalaNestedTypesProfile(DefaultProfile):
 
@@ -698,5 +707,20 @@ class HiveProfile(DefaultProfile):
     return (AnalyticAvg, AnalyticCount, AnalyticFirstValue, AnalyticLag,
             AnalyticLastValue, AnalyticLead, AnalyticMax, AnalyticMin, AnalyticSum)
 
+
+class DMLOnlyProfile(DefaultProfile):
+  """
+  Profile that only executes DML statements
+
+  TODO: This will be useful for testing DML; eventually this should be folded into the
+  default profile. (IMPALA-4601)
+  """
+  def __init__(self):
+    super(DMLOnlyProfile, self).__init__()
+    self._weights.update({
+        'STATEMENT': {
+            InsertStatement: 1}})
+
+
 PROFILES = [var for var in locals().values()
             if isinstance(var, type) and var.__name__.endswith('Profile')]

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54665120/tests/comparison/random_val_generator.py
----------------------------------------------------------------------
diff --git a/tests/comparison/random_val_generator.py b/tests/comparison/random_val_generator.py
index 768bb59..340a18c 100644
--- a/tests/comparison/random_val_generator.py
+++ b/tests/comparison/random_val_generator.py
@@ -19,7 +19,7 @@ from datetime import datetime, timedelta
 from decimal import Decimal as PyDecimal
 from random import randint, random, uniform
 
-from db_types import Boolean, Char, Decimal, Float, Int, Timestamp
+from tests.comparison.db_types import Boolean, Char, Decimal, Float, Int, Timestamp
 
 class RandomValGenerator(object):
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54665120/tests/comparison/statement_generator.py
----------------------------------------------------------------------
diff --git a/tests/comparison/statement_generator.py b/tests/comparison/statement_generator.py
new file mode 100644
index 0000000..5c37458
--- /dev/null
+++ b/tests/comparison/statement_generator.py
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from tests.comparison.common import Table
+from tests.comparison.db_types import Char, Float, Int
+from tests.comparison.model_translator import SqlWriter
+from tests.comparison.query import (
+    InsertClause,
+    InsertStatement,
+    Query,
+    StatementExecutionMode,
+    ValuesClause,
+    ValuesRow)
+from tests.comparison.query_generator import QueryGenerator
+from tests.comparison.query_profile import DefaultProfile
+
+
+class InsertStatementGenerator(object):
+  def __init__(self, profile):
+    # QueryProfile-like object
+    self.profile = profile
+    # used to generate SELECT queries for INSERT ... SELECT statements
+    self.query_generator = QueryGenerator(profile)
+
+  def generate_statement(self, tables, dml_table):
+    """
+    Return a randomly generated INSERT statement.
+
+    tables should be a list of Table objects. A typical source of such a list comes from
+    db_connection.DbCursor.describe_common_tables(). This list describes the possible
+    "sources" of the INSERT's WITH and FROM/WHERE clauses.
+
+    dml_table is a required Table object. The INSERT will be into this table.
+
+    This is just a stub, good enough to generatea valid INSERT INTO ... VALUES
+    statement. Actual implementation is tracked IMPALA-4353.
+    """
+    if not (isinstance(tables, list) and len(tables) > 0 and
+            all((isinstance(t, Table) for t in tables))):
+      raise Exception("tables must be a not-empty list of Table objects")
+
+    if not isinstance(dml_table, Table):
+      raise Exception('dml_table must be a Table')
+
+    if dml_table.primary_keys:
+      conflict_action = InsertStatement.CONFLICT_ACTION_IGNORE
+    else:
+      conflict_action = InsertStatement.CONFLICT_ACTION_DEFAULT
+
+    return InsertStatement(
+        insert_clause=InsertClause(dml_table),
+        values_clause=self.generate_values_clause(dml_table.cols),
+        conflict_action=conflict_action, execution=StatementExecutionMode.DML_TEST)
+
+  def generate_values_clause(self, table_columns):
+    constants = []
+    for col in table_columns:
+      val = self.profile.choose_constant(return_type=col.exact_type,
+                                         allow_null=(not col.is_primary_key))
+      constants.append(val)
+    return ValuesClause([ValuesRow(constants)])
+
+
+def get_generator(statement_type):
+  """
+  Given a statement type, return the proper statement generator.
+  """
+  STATEMENT_GENERATOR_MAP = {
+      InsertStatement: InsertStatementGenerator,
+      Query: QueryGenerator,
+  }
+  return STATEMENT_GENERATOR_MAP[statement_type]

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54665120/tests/comparison/tests/test_use_nested_with.py
----------------------------------------------------------------------
diff --git a/tests/comparison/tests/test_use_nested_with.py b/tests/comparison/tests/test_use_nested_with.py
index 418fdd7..ad9b236 100644
--- a/tests/comparison/tests/test_use_nested_with.py
+++ b/tests/comparison/tests/test_use_nested_with.py
@@ -15,8 +15,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import pytest
-
 from tests.comparison.common import TableExprList, Column, Table
 from tests.comparison.db_types import Int
 from tests.comparison.query_generator import QueryGenerator
@@ -64,5 +62,5 @@ def test_use_nested_width_subquery():
   table_expr_list.append(left_table)
 
   # Check that each nested_query doesn't have a with clause
-  for nested_query in mock_query_gen.create_query(table_expr_list).nested_queries:
-    assert nested_query.with_clause is None
\ No newline at end of file
+  for nested_query in mock_query_gen.generate_statement(table_expr_list).nested_queries:
+    assert nested_query.with_clause is None

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54665120/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index 690c999..c66fa34 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -1055,7 +1055,7 @@ def load_random_queries_and_populate_runtime_info(
 
   def generate_candidates():
     while True:
-      query_model = query_generator.create_query(tables)
+      query_model = query_generator.generate_statement(tables)
       sql = model_translator.write_query(query_model)
       query = Query()
       query.sql = sql


[4/4] incubator-impala git commit: IMPALA-4651: Add LibEv to build

Posted by he...@apache.org.
IMPALA-4651: Add LibEv to build

Add libev 4.20 to the Impala build. This is a dependency of KRPC.

FindLibEv.cmake was taken from Apache Kudu.

Change-Id: Iaf0646533592e6a8cd929a8cb015b83a7ea3008f
Reviewed-on: http://gerrit.cloudera.org:8080/5659
Tested-by: Impala Public Jenkins
Reviewed-by: Henry Robinson <he...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a81ad5ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a81ad5ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a81ad5ea

Branch: refs/heads/master
Commit: a81ad5eaab8a088e1fc1429449a1c15b029cffdb
Parents: 5466512
Author: Henry Robinson <he...@cloudera.com>
Authored: Tue Nov 8 13:54:55 2016 -0800
Committer: Henry Robinson <he...@cloudera.com>
Committed: Thu Jan 12 23:44:26 2017 +0000

----------------------------------------------------------------------
 CMakeLists.txt                |  6 ++++++
 bin/bootstrap_toolchain.py    |  6 +++---
 bin/impala-config.sh          |  1 +
 cmake_modules/FindLibEv.cmake | 41 ++++++++++++++++++++++++++++++++++++++
 4 files changed, 51 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a81ad5ea/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 18be5d4..e0339d6 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -77,6 +77,7 @@ set_dep_root(GFLAGS)
 set_dep_root(GLOG)
 set_dep_root(GPERFTOOLS)
 set_dep_root(GTEST)
+set_dep_root(LIBEV)
 set_dep_root(LLVM)
 set(LLVM_DEBUG_ROOT $ENV{IMPALA_TOOLCHAIN}/llvm-$ENV{IMPALA_LLVM_DEBUG_VERSION})
 set_dep_root(LZ4)
@@ -310,6 +311,11 @@ ADD_THIRDPARTY_LIB(protoc
   STATIC_LIB "${PROTOBUF_PROTOC_STATIC_LIBRARY}"
   DEPS protobuf)
 
+find_package(LibEv REQUIRED)
+include_directories(SYSTEM ${LIBEV_INCLUDE_DIR})
+ADD_THIRDPARTY_LIB(libev
+  STATIC_LIB "${LIBEV_STATIC_LIB}")
+
 ###################################################################
 
 # KuduClient can use GLOG

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a81ad5ea/bin/bootstrap_toolchain.py
----------------------------------------------------------------------
diff --git a/bin/bootstrap_toolchain.py b/bin/bootstrap_toolchain.py
index c8e35de..1434862 100755
--- a/bin/bootstrap_toolchain.py
+++ b/bin/bootstrap_toolchain.py
@@ -350,9 +350,9 @@ if __name__ == "__main__":
     os.makedirs(toolchain_root)
 
   packages = ["avro", "binutils", "boost", "breakpad", "bzip2", "cmake", "gcc", "gflags",
-      "glog", "gperftools", "gtest", "kudu", "llvm", ("llvm", "3.8.0-asserts-p1"), "lz4",
-      "openldap", "protobuf", "rapidjson", "re2", "snappy", "thrift", "tpc-h", "tpc-ds",
-      "zlib"]
+      "glog", "gperftools", "gtest", "kudu", "libev", "llvm",
+      ("llvm", "3.8.0-asserts-p1"), "lz4", "openldap", "protobuf", "rapidjson", "re2",
+      "snappy", "thrift", "tpc-h", "tpc-ds", "zlib"]
   bootstrap(toolchain_root, packages)
 
   # Download the CDH components if necessary.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a81ad5ea/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index c980ea3..2e72027 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -88,6 +88,7 @@ export IMPALA_GFLAGS_VERSION=2.0
 export IMPALA_GLOG_VERSION=0.3.2-p2
 export IMPALA_GPERFTOOLS_VERSION=2.5
 export IMPALA_GTEST_VERSION=1.6.0
+export IMPALA_LIBEV_VERSION=4.20
 export IMPALA_LLVM_VERSION=3.8.0-p1
 export IMPALA_LLVM_ASAN_VERSION=3.8.0-p1
 # Debug builds should use the release+asserts build to get additional coverage.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a81ad5ea/cmake_modules/FindLibEv.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/FindLibEv.cmake b/cmake_modules/FindLibEv.cmake
new file mode 100644
index 0000000..40443f1
--- /dev/null
+++ b/cmake_modules/FindLibEv.cmake
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# - Find LIBEV (ev++.h, libev.a, and libev.so)
+# This module defines
+#  LIBEV_INCLUDE_DIR, directory containing headers
+#  LIBEV_SHARED_LIB, path to libev's shared library
+#  LIBEV_STATIC_LIB, path to libev's static library
+#  LIBEV_FOUND, whether libev has been found
+
+find_path(LIBEV_INCLUDE_DIR ev++.h PATHS
+  ${LIBEV_ROOT}/include
+  # make sure we don't accidentally pick up a different version
+  NO_CMAKE_SYSTEM_PATH
+  NO_SYSTEM_ENVIRONMENT_PATH)
+find_library(LIBEV_SHARED_LIB ev PATHS
+  ${LIBEV_ROOT}/lib
+  NO_CMAKE_SYSTEM_PATH
+  NO_SYSTEM_ENVIRONMENT_PATH)
+find_library(LIBEV_STATIC_LIB libev.a PATHS
+  ${LIBEV_ROOT}/lib
+  NO_CMAKE_SYSTEM_PATH
+  NO_SYSTEM_ENVIRONMENT_PATH)
+
+include(FindPackageHandleStandardArgs)
+find_package_handle_standard_args(LIBEV REQUIRED_VARS
+  LIBEV_SHARED_LIB LIBEV_STATIC_LIB LIBEV_INCLUDE_DIR)


[2/4] incubator-impala git commit: IMPALA-4036: invalid SQL generated for partitioned table with comment

Posted by he...@apache.org.
IMPALA-4036: invalid SQL generated for partitioned table with comment

For a table that has both a table comment and a partition specified,
"show create table" incorrectly outputs the comment before the partition.
This is not the correct order, and it results in an invalid SQL.

This transaction fixes the ordering (partition comes before comment) and
adds tests for this case.

Change-Id: I29a33cfd142b473997fdc3acfe3f0966bc7ed784
Reviewed-on: http://gerrit.cloudera.org:8080/5648
Tested-by: Impala Public Jenkins
Reviewed-by: Henry Robinson <he...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/57552619
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/57552619
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/57552619

Branch: refs/heads/master
Commit: 5755261954d71b05b5e56c8659edd17de88b2d93
Parents: 8b7f876
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Mon Jan 9 13:46:22 2017 -0800
Committer: Henry Robinson <he...@cloudera.com>
Committed: Thu Jan 12 20:41:35 2017 +0000

----------------------------------------------------------------------
 fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java    | 3 ++-
 fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java     | 6 ++++--
 .../functional-query/queries/QueryTest/show-create-table.test  | 2 ++
 3 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57552619/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index 35f7e79..a922c8b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -264,7 +264,6 @@ public class ToSqlUtils {
       sb.append("\n)");
     }
     sb.append("\n");
-    if (tableComment != null) sb.append(" COMMENT '" + tableComment + "'\n");
 
     if (partitionColumnsSql != null && partitionColumnsSql.size() > 0) {
       sb.append(String.format("PARTITIONED BY (\n  %s\n)\n",
@@ -275,6 +274,8 @@ public class ToSqlUtils {
       sb.append("PARTITION BY " + kuduPartitionByParams + "\n");
     }
 
+    if (tableComment != null) sb.append(" COMMENT '" + tableComment + "'\n");
+
     if (rowFormat != null && !rowFormat.isDefault()) {
       sb.append("ROW FORMAT DELIMITED");
       if (rowFormat.getFieldDelimiter() != null) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57552619/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
index 9514cc6..2b52a68 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
@@ -296,9 +296,11 @@ public class ToSqlTest extends FrontendTestBase {
 
   @Test
   public void TestCreateTable() throws AnalysisException {
-    testToSql("create table p (a int)",
+    testToSql("create table p (a int) partitioned by (day string) " +
+        "comment 'This is a test'",
         "default",
-        "CREATE TABLE default.p ( a INT ) STORED AS TEXTFILE", true);
+        "CREATE TABLE default.p ( a INT ) PARTITIONED BY ( day STRING ) " +
+        "COMMENT 'This is a test' STORED AS TEXTFILE", true);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57552619/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
index 252cd96..946d229 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
@@ -71,6 +71,7 @@ PARTITIONED BY (
   y INT,
   a BOOLEAN
 )
+COMMENT 'This is a test'
 STORED AS TEXTFILE
 ---- RESULTS
 CREATE TABLE show_create_table_test_db.test3 (
@@ -93,6 +94,7 @@ PARTITIONED BY (
   y INT,
   a BOOLEAN
 )
+COMMENT 'This is a test'
 STORED AS TEXTFILE
 LOCATION '$$location_uri$$'
 ====