You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/12 22:10:11 UTC

[36/50] [abbrv] incubator-impala git commit: IMPALA-3452: S3: Disable Impala staging for INSERTs via flag for speedup

IMPALA-3452: S3: Disable Impala staging for INSERTs via flag for speedup

INSERTs on S3 are slower because of double buffering where we buffer
once locally and once in a staging directoy in S3 before moving the
file(s) to the final location. Also, moving the file from the staging
directory to the final location in HDFS is a quick rename which is
only a metadata operation. However, on S3, renames are not supported,
thus becoming a full file copy instead of just a metadata rename
operation.

This patch instroduces a boolean query option "s3_skip_insert_staging"
which avoids the staging step on S3 and allows the sinks to write to
the final location directly.

This trades in consistency for the sake of performance. If a node(s)
fails during the query, then we will end up with inconsistent results
in the final location.

P.S: This option is disabled for INSERT OVERWRITE queries as that
would require cleaning the destination directory before moving the
final files there. However, the coordinator is responsible for the
cleaning which takes place only after the table sinks have moved
the files to the final location. Thus, INSERT OVERWRITE queries must
still have their files moved to a staging location by the table sinks.

Performance gains:
 - For non-partitioned tables, the INSERT queries run 4-4.5x faster on
   S3. (Tested on a 63GB INSERT to a table)
 - For heavily partitioned tables, there is considerable improvement
   in the order of 4-5 minutes on queries that take ~27 minutes but
   queries are still slow because of IMPALA-3482 where the catalog
   takes too long to update all the metadata. (Tested with a query
   that creates 2.4K partitions in a table totalling ~19GB).

Change-Id: Iff9620d41ba0d5fb1aa0c9f4abb48866fc2b0698
Reviewed-on: http://gerrit.cloudera.org:8080/2905
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/27815818
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/27815818
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/27815818

Branch: refs/heads/master
Commit: 27815818b92362bc8b913a3da789c6debc88d551
Parents: 616eb2f
Author: Sailesh Mukil <sa...@cloudera.com>
Authored: Fri Apr 29 12:30:59 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu May 12 14:18:00 2016 -0700

----------------------------------------------------------------------
 be/src/exec/hdfs-table-sink.cc                  | 37 ++++++----
 be/src/exec/hdfs-table-sink.h                   |  9 +++
 be/src/runtime/coordinator.cc                   |  4 +-
 be/src/service/query-options.cc                 |  5 ++
 be/src/service/query-options.h                  |  5 +-
 common/thrift/ImpalaInternalService.thrift      |  6 ++
 common/thrift/ImpalaService.thrift              | 13 +++-
 .../queries/QueryTest/insert.test               | 76 ++++++++++++++++++++
 8 files changed, 137 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27815818/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index f345b73..d3a2ade 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -278,11 +278,21 @@ void HdfsTableSink::BuildHdfsFileNames(
 Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state,
     OutputPartition* output_partition) {
   SCOPED_TIMER(ADD_TIMER(profile(), "TmpFileCreateTimer"));
-  stringstream filename;
-  filename << output_partition->tmp_hdfs_file_name_prefix
-           << "." << output_partition->num_files
-           << "." << output_partition->writer->file_extension();
-  output_partition->current_file_name = filename.str();
+  string final_location = Substitute("$0.$1.$2",
+      output_partition->final_hdfs_file_name_prefix, output_partition->num_files,
+      output_partition->writer->file_extension());
+
+  // If ShouldSkipStaging() is true, then the table sink will write the file(s) for this
+  // partition to the final location directly. If it is false, the file(s) will be written
+  // to a temporary staging location which will be moved by the coordinator to the final
+  // location.
+  if (ShouldSkipStaging(state, output_partition)) {
+    output_partition->current_file_name = final_location;
+  } else {
+    output_partition->current_file_name = Substitute("$0.$1.$2",
+      output_partition->tmp_hdfs_file_name_prefix, output_partition->num_files,
+      output_partition->writer->file_extension());
+  }
   // Check if tmp_hdfs_file_name exists.
   const char* tmp_hdfs_file_name_cstr =
       output_partition->current_file_name.c_str();
@@ -323,12 +333,10 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state,
   ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT->Increment(1);
   COUNTER_ADD(files_created_counter_, 1);
 
-  // Save the ultimate destination for this file (it will be moved by the coordinator)
-  stringstream dest;
-  dest << output_partition->final_hdfs_file_name_prefix
-       << "." << output_partition->num_files
-       << "." << output_partition->writer->file_extension();
-  (*state->hdfs_files_to_move())[output_partition->current_file_name] = dest.str();
+  if (!ShouldSkipStaging(state, output_partition)) {
+    // Save the ultimate destination for this file (it will be moved by the coordinator)
+    (*state->hdfs_files_to_move())[output_partition->current_file_name] = final_location;
+  }
 
   ++output_partition->num_files;
   output_partition->num_rows = 0;
@@ -498,7 +506,7 @@ inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state,
     state->per_partition_status()->insert(
         make_pair(partition->partition_name, partition_status));
 
-    if (!no_more_rows) {
+    if (!no_more_rows && ShouldSkipStaging(state, partition)) {
       // Indicate that temporary directory is to be deleted after execution
       (*state->hdfs_files_to_move())[partition->tmp_hdfs_dir_name] = "";
     }
@@ -660,6 +668,11 @@ void HdfsTableSink::Close(RuntimeState* state) {
   closed_ = true;
 }
 
+bool HdfsTableSink::ShouldSkipStaging(RuntimeState* state, OutputPartition* partition) {
+  return IsS3APath(partition->final_hdfs_file_name_prefix.c_str()) && !overwrite_ &&
+      state->query_options().s3_skip_insert_staging;
+}
+
 string HdfsTableSink::DebugString() const {
   stringstream out;
   out << "HdfsTableSink(overwrite=" << (overwrite_ ? "true" : "false")

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27815818/be/src/exec/hdfs-table-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index 2083ab2..a06e66d 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -56,6 +56,7 @@ struct OutputPartition {
 
   /// Name of the temporary directory that files for this partition are staged to before
   /// the coordinator moves them to their permanent location once the query completes.
+  /// Not used if 'skip_staging' is true.
   /// Path: <base_table_dir/<staging_dir>/<unique_id>_dir/
   std::string tmp_hdfs_dir_name;
 
@@ -210,6 +211,14 @@ class HdfsTableSink : public DataSink {
   /// Closes the hdfs file for this partition as well as the writer.
   void ClosePartitionFile(RuntimeState* state, OutputPartition* partition);
 
+  // Returns TRUE if the staging step should be skipped for this partition. This allows
+  // for faster INSERT query completion time for the S3A filesystem as the coordinator
+  // does not have to copy the file(s) from the staging locaiton to the final location. We
+  // do not skip for INSERT OVERWRITEs because the coordinator will delete all files in
+  // the final location before moving the staged files there, so we cannot write directly
+  // to the final location and need to write to the temporary staging location.
+  bool ShouldSkipStaging(RuntimeState* state, OutputPartition* partition);
+
   /// Descriptor of target table. Set in Prepare().
   const HdfsTableDescriptor* table_desc_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27815818/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 6753c2b..2af596c 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -861,7 +861,9 @@ Status Coordinator::FinalizeSuccessfulInsert() {
           partition_create_ops.Add(CREATE_DIR, part_path);
         }
       }
-    } else {
+    } else if (!is_s3_path || !query_ctx_.request.query_options.s3_skip_insert_staging) {
+      // If the S3_SKIP_INSERT_STAGING query option is set, then the partition directories
+      // would have already been created by the table sinks.
       if (FLAGS_insert_inherit_permissions && !is_s3_path) {
         PopulatePathPermissionCache(
             partition_fs_connection, part_path, &permissions_cache);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27815818/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 45571bf..ce538bf 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -400,6 +400,11 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_mt_num_cores(num_cores);
         break;
       }
+      case TImpalaQueryOptions::S3_SKIP_INSERT_STAGING: {
+        query_options->__set_s3_skip_insert_staging(
+            iequals(value, "true") || iequals(value, "1"));
+        break;
+      }
       default:
         // We hit this DCHECK(false) if we forgot to add the corresponding entry here
         // when we add a new query option.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27815818/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index a727c8d..56e2e1a 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -32,7 +32,7 @@ class TQueryOptions;
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::MT_NUM_CORES + 1);\
+      TImpalaQueryOptions::S3_SKIP_INSERT_STAGING + 1);\
   QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\
   QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -76,7 +76,8 @@ class TQueryOptions;
   QUERY_OPT_FN(max_num_runtime_filters, MAX_NUM_RUNTIME_FILTERS)\
   QUERY_OPT_FN(parquet_annotate_strings_utf8, PARQUET_ANNOTATE_STRINGS_UTF8)\
   QUERY_OPT_FN(parquet_fallback_schema_resolution, PARQUET_FALLBACK_SCHEMA_RESOLUTION)\
-  QUERY_OPT_FN(mt_num_cores, MT_NUM_CORES);
+  QUERY_OPT_FN(mt_num_cores, MT_NUM_CORES)\
+  QUERY_OPT_FN(s3_skip_insert_staging, S3_SKIP_INSERT_STAGING);
 
 /// Converts a TQueryOptions struct into a map of key, value pairs.
 void TQueryOptionsToMap(const TQueryOptions& query_options,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27815818/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 6c2fc3e..611155c 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -186,6 +186,12 @@ struct TQueryOptions {
   // 1: single-threaded execution mode
   // 0: multi-threaded execution mode, number of cores is the pool default
   44: optional i32 mt_num_cores = 1
+
+  // If true, INSERT writes to S3 go directly to their final location rather than being
+  // copied there by the coordinator. We cannot do this for INSERT OVERWRITES because for
+  // those queries, the coordinator deletes all files in the final location before copying
+  // the files there.
+  45: optional bool s3_skip_insert_staging = true
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27815818/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index c9535eb..0a030ad 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -204,14 +204,21 @@ enum TImpalaQueryOptions {
 
   // If true, use UTF-8 annotation for string columns. Note that char and varchar columns
   // always use the annotation.
-  PARQUET_ANNOTATE_STRINGS_UTF8
+  PARQUET_ANNOTATE_STRINGS_UTF8,
 
   // Determines how to resolve Parquet files' schemas in the absence of field IDs (which
   // is always, since fields IDs are NYI). Valid values are "position" and "name".
-  PARQUET_FALLBACK_SCHEMA_RESOLUTION
+  PARQUET_FALLBACK_SCHEMA_RESOLUTION,
 
   // Multi-threaded execution: number of cores per machine
-  MT_NUM_CORES
+  MT_NUM_CORES,
+
+  // If true, INSERT writes to S3 go directly to their final location rather than being
+  // copied there by the coordinator. We cannot do this for INSERT OVERWRITES because for
+  // those queries, the coordinator deletes all files in the final location before copying
+  // the files there.
+  // TODO: Find a way to get this working for INSERT OVERWRITEs too.
+  S3_SKIP_INSERT_STAGING
 }
 
 // The summary of an insert.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27815818/testdata/workloads/functional-query/queries/QueryTest/insert.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/insert.test b/testdata/workloads/functional-query/queries/QueryTest/insert.test
index 29450cd..157fd9a 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/insert.test
@@ -768,3 +768,79 @@ select * from table_with_header_insert;
 ---- TYPES
 INT
 ====
+---- QUERY
+# The following 4 queries are to test IMPALA-3452 which test S3 INSERTs with staging.
+SET S3_SKIP_INSERT_STAGING=false;
+# static partition overwrite
+insert overwrite table alltypesinsert
+partition (year=2009, month=4)
+select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
+float_col, double_col, date_string_col, string_col, timestamp_col
+from alltypessmall
+where year=2009 and month=4
+---- SETUP
+DROP PARTITIONS alltypesinsert
+RESET alltypesinsert
+---- RESULTS
+year=2009/month=4/: 25
+====
+---- QUERY
+# search the overwritten partition to verify the results
+select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col,
+double_col, date_string_col, string_col
+from alltypesinsert
+where year=2009 and month=4
+---- RESULTS
+75,false,0,0,0,0,0,0,'04/01/09','0'
+76,true,1,1,1,10,1.100000023841858,10.1,'04/01/09','1'
+77,false,2,2,2,20,2.200000047683716,20.2,'04/01/09','2'
+78,true,3,3,3,30,3.299999952316284,30.3,'04/01/09','3'
+79,false,4,4,4,40,4.400000095367432,40.4,'04/01/09','4'
+80,true,5,5,5,50,5.5,50.5,'04/01/09','5'
+81,false,6,6,6,60,6.599999904632568,60.6,'04/01/09','6'
+82,true,7,7,7,70,7.699999809265137,70.7,'04/01/09','7'
+83,false,8,8,8,80,8.800000190734863,80.8,'04/01/09','8'
+84,true,9,9,9,90,9.899999618530273,90.90000000000001,'04/01/09','9'
+85,false,0,0,0,0,0,0,'04/02/09','0'
+86,true,1,1,1,10,1.100000023841858,10.1,'04/02/09','1'
+87,false,2,2,2,20,2.200000047683716,20.2,'04/02/09','2'
+88,true,3,3,3,30,3.299999952316284,30.3,'04/02/09','3'
+89,false,4,4,4,40,4.400000095367432,40.4,'04/02/09','4'
+90,true,5,5,5,50,5.5,50.5,'04/02/09','5'
+91,false,6,6,6,60,6.599999904632568,60.6,'04/02/09','6'
+92,true,7,7,7,70,7.699999809265137,70.7,'04/02/09','7'
+93,false,8,8,8,80,8.800000190734863,80.8,'04/02/09','8'
+94,true,9,9,9,90,9.899999618530273,90.90000000000001,'04/02/09','9'
+95,false,0,0,0,0,0,0,'04/03/09','0'
+96,true,1,1,1,10,1.100000023841858,10.1,'04/03/09','1'
+97,false,2,2,2,20,2.200000047683716,20.2,'04/03/09','2'
+98,true,3,3,3,30,3.299999952316284,30.3,'04/03/09','3'
+99,false,4,4,4,40,4.400000095367432,40.4,'04/03/09','4'
+---- TYPES
+int, boolean, tinyint, smallint, int, bigint, float, double, string, string
+====
+---- QUERY
+SET S3_SKIP_INSERT_STAGING=false;
+# fully dynamic partition insert$TABLE, check partition creation
+insert into table alltypesinsert
+partition (year, month)
+select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
+float_col, double_col, date_string_col, string_col, timestamp_col, year, month
+from alltypessmall
+---- SETUP
+DROP PARTITIONS alltypesinsert
+---- RESULTS
+year=2009/month=1/: 25
+year=2009/month=2/: 25
+year=2009/month=3/: 25
+year=2009/month=4/: 25
+====
+---- QUERY
+# search the partitions to verify they contain all 100 rows
+select count(timestamp_col) from alltypesinsert
+where year=2009 and month>=1 and month<=4
+---- RESULTS
+100
+---- TYPES
+bigint
+====