You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/01/18 04:27:43 UTC

[03/12] impala git commit: IMPALA-6386: Invalidate metadata at table level for dataload

IMPALA-6386: Invalidate metadata at table level for dataload

Dataload currently executes bin/load-data.py for TPC-H,
TPC-DS, and functional-query concurrently. One of the final
steps for bin/load-data.py is to run a global "invalidate
metadata". Global "invalidate metadata" commands are known
to cause problem on concurrent systems. See IMPALA-5087.
For dataload, if TPC-H executes "invalidate metadata" while
TPC-DS is still creating tables and adding partitions,
the TPC-DS executor might erroneously believe that a table
does not exist.

This changes dataload to invalidate metadata at an
individual table level rather than globally. This
prevents the concurrency issue.

This also changes the names of some of the intermediate
SQL files generated by generate-schema-statements.py
and consumed by load-data.py to make them less confusing.

Change-Id: Ibc3a6d8a674a0bf6b02069bfe8a5e12034335b1f
Reviewed-on: http://gerrit.cloudera.org:8080/9009
Reviewed-by: Joe McDonnell <jo...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d9b6fd07
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d9b6fd07
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d9b6fd07

Branch: refs/heads/master
Commit: d9b6fd073055b436c7404d49454dc215b2c7a369
Parents: dcc7be0
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Thu Jan 11 15:09:52 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Jan 17 22:52:58 2018 +0000

----------------------------------------------------------------------
 bin/load-data.py                           | 21 ++++++++-------------
 testdata/bin/generate-schema-statements.py | 14 ++++++++++----
 2 files changed, 18 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d9b6fd07/bin/load-data.py
----------------------------------------------------------------------
diff --git a/bin/load-data.py b/bin/load-data.py
index ede604a..273fe4d 100755
--- a/bin/load-data.py
+++ b/bin/load-data.py
@@ -278,15 +278,6 @@ def exec_impala_query_from_file_parallel(query_files):
   # finished putting their results in the queue.
   for thread in threads: thread.join()
 
-def invalidate_impala_metadata():
-  print "Invalidating Metadata"
-  impala_client = ImpalaBeeswaxClient(options.impalad, use_kerberos=options.use_kerberos)
-  impala_client.connect()
-  try:
-    impala_client.execute('invalidate metadata')
-  finally:
-    impala_client.close_connection()
-
 if __name__ == "__main__":
   # Having the actual command line at the top of each data-load-* log can help
   # when debugging dataload issues.
@@ -324,8 +315,8 @@ if __name__ == "__main__":
     load_file_substr = "%s-%s" % (workload, options.exploration_strategy)
     # Data loading with Impala is done in parallel, each file format has a separate query
     # file.
-    create_filename = '%s-impala-generated' % load_file_substr
-    load_filename = '%s-impala-load-generated' % load_file_substr
+    create_filename = 'create-%s-impala-generated' % load_file_substr
+    load_filename = 'load-%s-impala-generated' % load_file_substr
     impala_create_files = [f for f in dataset_dir_contents if create_filename in f]
     impala_load_files = [f for f in dataset_dir_contents if load_filename in f]
 
@@ -340,11 +331,15 @@ if __name__ == "__main__":
     exec_hive_query_from_file('load-%s-hive-generated.sql' % load_file_substr)
     exec_hbase_query_from_file('post-load-%s-hbase-generated.sql' % load_file_substr)
 
-    if impala_load_files: invalidate_impala_metadata()
+    # Invalidate so that Impala sees the loads done by Hive before loading Parquet/Kudu
+    # Note: This only invalidates tables for this workload.
+    invalidate_sql_file = 'invalidate-{0}-impala-generated.sql'.format(load_file_substr)
+    if impala_load_files: exec_impala_query_from_file(invalidate_sql_file)
     exec_impala_query_from_file_parallel(impala_load_files)
+    # Final invalidate for this workload
+    exec_impala_query_from_file(invalidate_sql_file)
     loading_time_map[workload] = time.time() - start_time
 
-  invalidate_impala_metadata()
   total_time = 0.0
   for workload, load_time in loading_time_map.iteritems():
     total_time += load_time

http://git-wip-us.apache.org/repos/asf/impala/blob/d9b6fd07/testdata/bin/generate-schema-statements.py
----------------------------------------------------------------------
diff --git a/testdata/bin/generate-schema-statements.py b/testdata/bin/generate-schema-statements.py
index b8f6e8c..28f9fd1 100755
--- a/testdata/bin/generate-schema-statements.py
+++ b/testdata/bin/generate-schema-statements.py
@@ -482,13 +482,14 @@ def generate_statements(output_name, test_vectors, sections,
   hive_output = Statements()
   hbase_output = Statements()
   hbase_post_load = Statements()
+  impala_invalidate = Statements()
 
   table_names = None
   if options.table_names:
     table_names = [name.lower() for name in options.table_names.split(',')]
   existing_tables = get_hdfs_subdirs_with_data(options.hive_warehouse_dir)
   for row in test_vectors:
-    impala_output = Statements()
+    impala_create = Statements()
     impala_load = Statements()
     file_format, data_set, codec, compression_type =\
         [row.file_format, row.dataset, row.compression_codec, row.compression_type]
@@ -581,7 +582,7 @@ def generate_statements(output_name, test_vectors, sections,
         if file_format not in IMPALA_SUPPORTED_INSERT_FORMATS:
           create_file_format = 'text'
 
-      output = impala_output
+      output = impala_create
       if create_hive or file_format == 'hbase':
         output = hive_output
       elif codec == 'lzo':
@@ -633,6 +634,10 @@ def generate_statements(output_name, test_vectors, sections,
             column_families))
         hbase_post_load.load.append("flush '%s_hbase.%s'\n" % (db_name, table_name))
 
+      # Need to emit an "invalidate metadata" for each individual table
+      invalidate_table_stmt = "INVALIDATE METADATA {0}.{1};\n".format(db, table_name)
+      impala_invalidate.create.append(invalidate_table_stmt)
+
       # The ALTER statement in hive does not accept fully qualified table names so
       # insert a use statement. The ALTER statement is skipped for HBASE as it's
       # used for adding partitions.
@@ -688,9 +693,9 @@ def generate_statements(output_name, test_vectors, sections,
           else:
             print 'Empty insert for table %s. Skipping insert generation' % table_name
 
-    impala_output.write_to_file("load-%s-impala-generated-%s-%s-%s.sql" %
+    impala_create.write_to_file("create-%s-impala-generated-%s-%s-%s.sql" %
         (output_name, file_format, codec, compression_type))
-    impala_load.write_to_file("load-%s-impala-load-generated-%s-%s-%s.sql" %
+    impala_load.write_to_file("load-%s-impala-generated-%s-%s-%s.sql" %
         (output_name, file_format, codec, compression_type))
 
 
@@ -699,6 +704,7 @@ def generate_statements(output_name, test_vectors, sections,
   hbase_output.write_to_file('load-' + output_name + '-hbase-generated.create')
   hbase_post_load.load.append("exit")
   hbase_post_load.write_to_file('post-load-' + output_name + '-hbase-generated.sql')
+  impala_invalidate.write_to_file('invalidate-' + output_name + '-impala-generated.sql')
 
 def parse_schema_template_file(file_name):
   VALID_SECTION_NAMES = ['DATASET', 'BASE_TABLE_NAME', 'COLUMNS', 'PARTITION_COLUMNS',