You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by sa...@apache.org on 2016/08/20 03:10:02 UTC

[1/2] incubator-impala git commit: IMPALA-2988: Refactor HdfsTableSink::Close() so that it cannot fail

Repository: incubator-impala
Updated Branches:
  refs/heads/master 2aa86309d -> 1522da351


IMPALA-2988: Refactor HdfsTableSink::Close() so that it cannot fail

HdfsTableSink::Close() makes calls to functions that can fail with a
Status. However, since the function has a void return type, these
error statuses are just logged and we cannot take any action according
to the type of error.

This patch moves the closing of the partition file into the FlushFinal
function from Close(), so that in case of an error on closing the
file, the error is propagated up and some action can be taken.

We try and close all the partition files in the map in Close() as well
because if a query is cancelled, FlushFinal will not be called and we
would end up leaking some file descriptors.

Also fixed some long lines in this patch.

Change-Id: I2546bc68ba136b2713d744c1b920878606a2217b
Reviewed-on: http://gerrit.cloudera.org:8080/4018
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/02608f89
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/02608f89
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/02608f89

Branch: refs/heads/master
Commit: 02608f89f5ee3385ef63533eb012cb75690d0b5d
Parents: 2aa8630
Author: Sailesh Mukil <sa...@cloudera.com>
Authored: Wed Mar 30 21:58:48 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sat Aug 20 01:53:24 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-table-sink.cc | 22 ++++++++++++----------
 be/src/exec/hdfs-table-sink.h  |  6 +++---
 2 files changed, 15 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/02608f89/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 0626bd4..fb547b4 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -340,7 +340,7 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state,
   output_partition->num_rows = 0;
   Status status = output_partition->writer->InitNewFile();
   if (!status.ok()) {
-    ClosePartitionFile(state, output_partition);
+    status.MergeStatus(ClosePartitionFile(state, output_partition));
     hdfsDelete(output_partition->hdfs_connection,
         output_partition->current_file_name.c_str(), 0);
   }
@@ -608,24 +608,27 @@ Status HdfsTableSink::FinalizePartitionFile(RuntimeState* state,
     DataSink::MergeInsertStats(partition->writer->stats(), &it->second.stats);
   }
 
-  ClosePartitionFile(state, partition);
+  RETURN_IF_ERROR(ClosePartitionFile(state, partition));
   return Status::OK();
 }
 
-void HdfsTableSink::ClosePartitionFile(RuntimeState* state, OutputPartition* partition) {
-  if (partition->tmp_hdfs_file == NULL) return;
+Status HdfsTableSink::ClosePartitionFile(
+    RuntimeState* state, OutputPartition* partition) {
+  if (partition->tmp_hdfs_file == NULL) return Status::OK();
   int hdfs_ret = hdfsCloseFile(partition->hdfs_connection, partition->tmp_hdfs_file);
   VLOG_FILE << "hdfsCloseFile() file=" << partition->current_file_name;
+  partition->tmp_hdfs_file = NULL;
+  ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT->Increment(-1);
   if (hdfs_ret != 0) {
-    state->LogError(ErrorMsg(TErrorCode::GENERAL,
+    return Status(ErrorMsg(TErrorCode::GENERAL,
         GetHdfsErrorMsg("Failed to close HDFS file: ",
         partition->current_file_name)));
   }
-  partition->tmp_hdfs_file = NULL;
-  ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT->Increment(-1);
+  return Status::OK();
 }
 
 Status HdfsTableSink::FlushFinal(RuntimeState* state) {
+  DCHECK(!closed_);
   SCOPED_TIMER(profile()->total_time_counter());
 
   if (dynamic_partition_key_expr_ctxs_.empty()) {
@@ -643,8 +646,6 @@ Status HdfsTableSink::FlushFinal(RuntimeState* state) {
     RETURN_IF_ERROR(FinalizePartitionFile(state, cur_partition->second.first));
   }
 
-  // TODO: Move call to ClosePartitionFile() here so that the error status can be
-  // propagated. If closing the file fails, the query should fail.
   return Status::OK();
 }
 
@@ -658,7 +659,8 @@ void HdfsTableSink::Close(RuntimeState* state) {
     if (cur_partition->second.first->writer.get() != NULL) {
       cur_partition->second.first->writer->Close();
     }
-    ClosePartitionFile(state, cur_partition->second.first);
+    Status close_status = ClosePartitionFile(state, cur_partition->second.first);
+    if (!close_status.ok()) state->LogError(close_status.msg());
   }
   partition_keys_to_output_partitions_.clear();
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/02608f89/be/src/exec/hdfs-table-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index fc32f4d..bb2f9d7 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -206,12 +206,12 @@ class HdfsTableSink : public DataSink {
   void BuildHdfsFileNames(const HdfsPartitionDescriptor& partition_descriptor,
       OutputPartition* output);
 
-  /// Updates runtime stats of HDFS with rows written, then closes the file associated with
-  /// the partition by calling ClosePartitionFile()
+  /// Updates runtime stats of HDFS with rows written, then closes the file associated
+  /// with the partition by calling ClosePartitionFile()
   Status FinalizePartitionFile(RuntimeState* state, OutputPartition* partition);
 
   /// Closes the hdfs file for this partition as well as the writer.
-  void ClosePartitionFile(RuntimeState* state, OutputPartition* partition);
+  Status ClosePartitionFile(RuntimeState* state, OutputPartition* partition);
 
   // Returns TRUE if the staging step should be skipped for this partition. This allows
   // for faster INSERT query completion time for the S3A filesystem as the coordinator


[2/2] incubator-impala git commit: IMPALA-3662: Don't double allocate tuples buffer in parquet scanner

Posted by sa...@apache.org.
IMPALA-3662: Don't double allocate tuples buffer in parquet scanner

HdfsScanner::StartNewRowBatch() is called once per row batch
by the parquet scanner to allocate a new row batch and tuple
buffer. Similarly, a scratch batch is created for each row
batch in HdfsParquetScanner::AssembleRows() which also contains
the tuple buffer. In reality, only the tuple buffer in the
scratch batch is used. So, the tuple buffer allocated by
HdfsScanner::StartNewRowBatch() is unused memory for the
parquet scanner.

This change fixes the problem above by implementing
HdfsParquetScanner::StartNewRowBatch() which creates
a new row batch without allocating the tuple buffer.
With this patch, the memory consumption when
materializing very wide tuples is reduced by half.

Change-Id: I826061a2be10fd0528ca4dd1e97146e3cb983370
Reviewed-on: http://gerrit.cloudera.org:8080/4064
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/1522da35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/1522da35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/1522da35

Branch: refs/heads/master
Commit: 1522da3510a36635e3fc694b26211554fcd2793a
Parents: 02608f8
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Aug 18 22:31:34 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sat Aug 20 03:03:10 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc | 10 ++++++++--
 be/src/exec/hdfs-parquet-scanner.h  |  5 +++++
 be/src/exec/hdfs-scanner.h          |  2 +-
 3 files changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1522da35/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index ee5f4d9..2aaf8de 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -301,13 +301,13 @@ Status HdfsParquetScanner::ProcessSplit() {
   DCHECK(add_batches_to_queue_);
   bool scanner_eos = false;
   do {
-    RETURN_IF_ERROR(StartNewRowBatch());
+    StartNewParquetRowBatch();
     RETURN_IF_ERROR(GetNextInternal(batch_, &scanner_eos));
     scan_node_->AddMaterializedRowBatch(batch_);
   } while (!scanner_eos && !scan_node_->ReachedLimit());
 
   // Transfer the remaining resources to this new batch in Close().
-  RETURN_IF_ERROR(StartNewRowBatch());
+  StartNewParquetRowBatch();
   return Status::OK();
 }
 
@@ -529,6 +529,12 @@ Status HdfsParquetScanner::AssembleRows(
   return Status::OK();
 }
 
+void HdfsParquetScanner::StartNewParquetRowBatch() {
+  DCHECK(add_batches_to_queue_);
+  batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
+      scan_node_->mem_tracker());
+}
+
 Status HdfsParquetScanner::CommitRows(RowBatch* dst_batch, int num_rows) {
   DCHECK(dst_batch != NULL);
   dst_batch->CommitRows(num_rows);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1522da35/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 1df09a4..dfd7785 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -446,6 +446,11 @@ class HdfsParquetScanner : public HdfsScanner {
   Status AssembleRows(const std::vector<ParquetColumnReader*>& column_readers,
       RowBatch* row_batch, bool* skip_row_group);
 
+  /// Set 'batch_' to a new row batch. Unlike the similarly named function in
+  /// HdfsScanner, this function will not allocate the tuple buffer. Only valid
+  /// to call if 'add_batches_to_queue_' is true.
+  void StartNewParquetRowBatch();
+
   /// Commit num_rows to the given row batch.
   /// Returns OK if the query is not cancelled and hasn't exceeded any mem limits.
   /// Scanner can call this with 0 rows to flush any pending resources (attached pools

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1522da35/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 53711ab..7ddf0a5 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -282,7 +282,7 @@ class HdfsScanner {
   Status InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition,
       THdfsFileFormat::type type, const std::string& scanner_name);
 
-  /// Set batch_ to a new row batch and update tuple_mem_ accordingly.
+  /// Set 'batch_' to a new row batch and update 'tuple_mem_' accordingly.
   /// Only valid to call if 'add_batches_to_queue_' is true.
   Status StartNewRowBatch();