You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2020/01/10 16:25:46 UTC

[impala] branch master updated (1bf9212 -> 641e2ab)

This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 1bf9212  IMPALA-9274: cyclic barrier implementation
     new 6e3d121  IMPALA-9035: Simplify casting string to timestamp.
     new 641e2ab  IMPALA-4192: Move static state from DataSink into a DataSinkConfig

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/blocking-plan-root-sink.cc             |  4 +-
 be/src/exec/blocking-plan-root-sink.h              |  2 +-
 be/src/exec/buffered-plan-root-sink.cc             |  8 +-
 be/src/exec/buffered-plan-root-sink.h              |  5 +-
 be/src/exec/data-sink.cc                           | 94 ++++++++++------------
 be/src/exec/data-sink.h                            | 56 ++++++++++---
 be/src/exec/hbase-table-sink.cc                    | 18 +++--
 be/src/exec/hbase-table-sink.h                     | 13 ++-
 be/src/exec/hdfs-table-sink.cc                     | 53 ++++++------
 be/src/exec/hdfs-table-sink.h                      | 24 ++++--
 be/src/exec/kudu-table-sink.cc                     | 20 +++--
 be/src/exec/kudu-table-sink.h                      | 13 ++-
 be/src/exec/nested-loop-join-builder.cc            | 14 +++-
 be/src/exec/nested-loop-join-builder.h             | 19 ++++-
 be/src/exec/nested-loop-join-node.cc               |  5 +-
 be/src/exec/nested-loop-join-node.h                |  4 +-
 be/src/exec/partitioned-hash-join-builder.cc       | 86 +++++++++++++++-----
 be/src/exec/partitioned-hash-join-builder.h        | 76 +++++++++++++++--
 be/src/exec/partitioned-hash-join-node.cc          | 39 ++++-----
 be/src/exec/partitioned-hash-join-node.h           | 10 ++-
 be/src/exec/plan-root-sink.cc                      | 18 ++++-
 be/src/exec/plan-root-sink.h                       | 12 ++-
 be/src/runtime/data-stream-test.cc                 | 14 ++--
 be/src/runtime/fragment-instance-state.cc          |  6 +-
 be/src/runtime/fragment-instance-state.h           |  2 +
 be/src/runtime/krpc-data-stream-sender.cc          | 55 ++++++++-----
 be/src/runtime/krpc-data-stream-sender.h           | 30 ++++---
 .../java/org/apache/impala/analysis/Analyzer.java  |  2 +
 .../rewrite/SimplifyCastStringToTimestamp.java     | 77 ++++++++++++++++++
 .../impala/analysis/ExprRewriteRulesTest.java      | 16 ++++
 30 files changed, 570 insertions(+), 225 deletions(-)
 create mode 100644 fe/src/main/java/org/apache/impala/rewrite/SimplifyCastStringToTimestamp.java


[impala] 02/02: IMPALA-4192: Move static state from DataSink into a DataSinkConfig

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 641e2abf2985972f96a1f27b94758c7bf26e64d5
Author: Bikramjeet Vig <bi...@cloudera.com>
AuthorDate: Tue Dec 10 17:01:21 2019 -0800

    IMPALA-4192: Move static state from DataSink into a DataSinkConfig
    
    This patch adds a new class called DataSinkConfig which contains a
    subset of the static state of their corresponding DataSink, of
    which there is one instance per fragment. DataSink contains the
    runtime state and there can be up to MT_DOP instances of it per
    fragment.
    Eventually all static state including codegened function pointers
    would be moved to the PlanNodes.
    
    Testing:
    Ran exhaustive tests successfully.
    
    Change-Id: I8d5b4226f6cec5305b0ec9a25c5f18b5521c8dd2
    Reviewed-on: http://gerrit.cloudera.org:8080/14941
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/blocking-plan-root-sink.cc       |  4 +-
 be/src/exec/blocking-plan-root-sink.h        |  2 +-
 be/src/exec/buffered-plan-root-sink.cc       |  8 +--
 be/src/exec/buffered-plan-root-sink.h        |  5 +-
 be/src/exec/data-sink.cc                     | 94 +++++++++++++---------------
 be/src/exec/data-sink.h                      | 56 +++++++++++++----
 be/src/exec/hbase-table-sink.cc              | 18 +++---
 be/src/exec/hbase-table-sink.h               | 13 +++-
 be/src/exec/hdfs-table-sink.cc               | 53 +++++++++-------
 be/src/exec/hdfs-table-sink.h                | 24 +++++--
 be/src/exec/kudu-table-sink.cc               | 20 +++---
 be/src/exec/kudu-table-sink.h                | 13 +++-
 be/src/exec/nested-loop-join-builder.cc      | 14 ++++-
 be/src/exec/nested-loop-join-builder.h       | 19 +++++-
 be/src/exec/nested-loop-join-node.cc         |  5 +-
 be/src/exec/nested-loop-join-node.h          |  4 +-
 be/src/exec/partitioned-hash-join-builder.cc | 86 +++++++++++++++++++------
 be/src/exec/partitioned-hash-join-builder.h  | 76 +++++++++++++++++++---
 be/src/exec/partitioned-hash-join-node.cc    | 39 +++++-------
 be/src/exec/partitioned-hash-join-node.h     | 10 ++-
 be/src/exec/plan-root-sink.cc                | 18 +++++-
 be/src/exec/plan-root-sink.h                 | 12 +++-
 be/src/runtime/data-stream-test.cc           | 14 ++---
 be/src/runtime/fragment-instance-state.cc    |  6 +-
 be/src/runtime/fragment-instance-state.h     |  2 +
 be/src/runtime/krpc-data-stream-sender.cc    | 55 ++++++++++------
 be/src/runtime/krpc-data-stream-sender.h     | 30 ++++++---
 27 files changed, 475 insertions(+), 225 deletions(-)

diff --git a/be/src/exec/blocking-plan-root-sink.cc b/be/src/exec/blocking-plan-root-sink.cc
index c34a968..fcff3fd 100644
--- a/be/src/exec/blocking-plan-root-sink.cc
+++ b/be/src/exec/blocking-plan-root-sink.cc
@@ -33,8 +33,8 @@ using boost::mutex;
 namespace impala {
 
 BlockingPlanRootSink::BlockingPlanRootSink(
-    TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state)
-  : PlanRootSink(sink_id, row_desc, state) {}
+    TDataSinkId sink_id, const DataSinkConfig& sink_config, RuntimeState* state)
+  : PlanRootSink(sink_id, sink_config, state) {}
 
 Status BlockingPlanRootSink::Prepare(
     RuntimeState* state, MemTracker* parent_mem_tracker) {
diff --git a/be/src/exec/blocking-plan-root-sink.h b/be/src/exec/blocking-plan-root-sink.h
index 38e4439..0a5b849 100644
--- a/be/src/exec/blocking-plan-root-sink.h
+++ b/be/src/exec/blocking-plan-root-sink.h
@@ -48,7 +48,7 @@ class ScalarExprEvaluator;
 class BlockingPlanRootSink : public PlanRootSink {
  public:
   BlockingPlanRootSink(
-      TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state);
+    TDataSinkId sink_id, const DataSinkConfig& sink_config, RuntimeState* state);
 
   /// TODO: Currently, this does nothing, it just calls DataSink::Prepare. However, adding
   /// it is necessary because BufferedPlanRootSink needs to use PlanRootSink::Prepare.
diff --git a/be/src/exec/buffered-plan-root-sink.cc b/be/src/exec/buffered-plan-root-sink.cc
index 6427bf6..c365f69 100644
--- a/be/src/exec/buffered-plan-root-sink.cc
+++ b/be/src/exec/buffered-plan-root-sink.cc
@@ -29,10 +29,10 @@ const int BufferedPlanRootSink::MAX_FETCH_SIZE;
 const int FETCH_NUM_BATCHES = 10;
 
 BufferedPlanRootSink::BufferedPlanRootSink(TDataSinkId sink_id,
-    const RowDescriptor* row_desc, RuntimeState* state,
-    const TBackendResourceProfile& resource_profile, const TDebugOptions& debug_options)
-  : PlanRootSink(sink_id, row_desc, state),
-    resource_profile_(resource_profile),
+    const DataSinkConfig& sink_config, RuntimeState* state,
+    const TDebugOptions& debug_options)
+  : PlanRootSink(sink_id, sink_config, state),
+    resource_profile_(sink_config.tsink_->plan_root_sink.resource_profile),
     debug_options_(debug_options) {}
 
 Status BufferedPlanRootSink::Prepare(
diff --git a/be/src/exec/buffered-plan-root-sink.h b/be/src/exec/buffered-plan-root-sink.h
index c908cda..7be257e 100644
--- a/be/src/exec/buffered-plan-root-sink.h
+++ b/be/src/exec/buffered-plan-root-sink.h
@@ -43,9 +43,8 @@ class DequeRowBatchQueue;
 /// synchronize access to the queue.
 class BufferedPlanRootSink : public PlanRootSink {
  public:
-  BufferedPlanRootSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
-      RuntimeState* state, const TBackendResourceProfile& resource_profile,
-      const TDebugOptions& debug_options);
+  BufferedPlanRootSink(TDataSinkId sink_id, const DataSinkConfig& sink_config,
+      RuntimeState* state, const TDebugOptions& debug_options);
 
   /// Initializes the row_batches_get_wait_timer_ and row_batches_send_wait_timer_
   /// counters.
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index ed3820f..b50d33e 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -39,101 +39,91 @@
 
 #include "common/names.h"
 
-DEFINE_int64(data_stream_sender_buffer_size, 16 * 1024,
-    "(Advanced) Max size in bytes which a row batch in a data stream sender's channel "
-    "can accumulate before the row batch is sent over the wire.");
-
 using strings::Substitute;
 
 namespace impala {
 
-// Empty string
-const char* const DataSink::ROOT_PARTITION_KEY = "";
-
-DataSink::DataSink(TDataSinkId sink_id, const RowDescriptor* row_desc, const string& name,
-    RuntimeState* state)
-  : closed_(false), row_desc_(row_desc), name_(name) {
-  profile_ = RuntimeProfile::Create(state->obj_pool(), name);
-  if (sink_id != -1) {
-    // There is one sink per fragment so we can use the fragment index as a unique
-    // identifier.
-    profile_->SetDataSinkId(sink_id);
-  }
-}
-
-DataSink::~DataSink() {
-  DCHECK(closed_);
+Status DataSinkConfig::Init(
+    const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+  tsink_ = &tsink;
+  input_row_desc_ = input_row_desc;
+  return ScalarExpr::Create(tsink.output_exprs, *input_row_desc_, state, &output_exprs_);
 }
 
-Status DataSink::Create(const TPlanFragmentCtx& fragment_ctx,
-    const TPlanFragmentInstanceCtx& fragment_instance_ctx, const RowDescriptor* row_desc,
-    RuntimeState* state, DataSink** sink) {
-  const TDataSink& thrift_sink = fragment_ctx.fragment.output_sink;
-  const vector<TExpr>& thrift_output_exprs = thrift_sink.output_exprs;
+Status DataSinkConfig::CreateConfig(const TDataSink& thrift_sink,
+    const RowDescriptor* row_desc, RuntimeState* state, const DataSinkConfig** sink) {
   ObjectPool* pool = state->obj_pool();
-  // We have one fragment per sink, so we can use the fragment index as the sink ID.
-  TDataSinkId sink_id = fragment_ctx.fragment.idx;
+  DataSinkConfig* data_sink = nullptr;
   switch (thrift_sink.type) {
     case TDataSinkType::DATA_STREAM_SINK:
       if (!thrift_sink.__isset.stream_sink) return Status("Missing data stream sink.");
       // TODO: figure out good buffer size based on size of output row
-      *sink = pool->Add(new KrpcDataStreamSender(sink_id,
-          fragment_instance_ctx.sender_id, row_desc, thrift_sink.stream_sink,
-          fragment_ctx.destinations, FLAGS_data_stream_sender_buffer_size, state));
+      data_sink = pool->Add(new KrpcDataStreamSenderConfig());
       break;
     case TDataSinkType::TABLE_SINK:
       if (!thrift_sink.__isset.table_sink) return Status("Missing table sink.");
       switch (thrift_sink.table_sink.type) {
         case TTableSinkType::HDFS:
-          *sink =
-              pool->Add(new HdfsTableSink(sink_id, row_desc, thrift_sink, state));
-          break;
-        case TTableSinkType::HBASE:
-          *sink =
-              pool->Add(new HBaseTableSink(sink_id, row_desc, thrift_sink, state));
+          data_sink = pool->Add(new HdfsTableSinkConfig());
           break;
         case TTableSinkType::KUDU:
           RETURN_IF_ERROR(CheckKuduAvailability());
-          *sink =
-              pool->Add(new KuduTableSink(sink_id, row_desc, thrift_sink, state));
+          data_sink = pool->Add(new KuduTableSinkConfig());
+          break;
+        case TTableSinkType::HBASE:
+          data_sink = pool->Add(new HBaseTableSinkConfig());
           break;
         default:
           stringstream error_msg;
           map<int, const char*>::const_iterator i =
               _TTableSinkType_VALUES_TO_NAMES.find(thrift_sink.table_sink.type);
           const char* str = i != _TTableSinkType_VALUES_TO_NAMES.end() ?
-              i->second : "Unknown table sink";
+              i->second :
+              "Unknown table sink";
           error_msg << str << " not implemented.";
           return Status(error_msg.str());
       }
       break;
     case TDataSinkType::PLAN_ROOT_SINK:
-      if (state->query_options().spool_query_results) {
-        *sink = pool->Add(new BufferedPlanRootSink(sink_id, row_desc, state,
-            fragment_ctx.fragment.output_sink.plan_root_sink.resource_profile,
-            fragment_instance_ctx.debug_options));
-      } else {
-        *sink = pool->Add(new BlockingPlanRootSink(sink_id, row_desc, state));
-      }
+      data_sink = pool->Add(new PlanRootSinkConfig());
       break;
     case TDataSinkType::JOIN_BUILD_SINK:
-      // IMPALA-4224 - join build sink not supported in backend execution.
+    // IMPALA-4224 - join build sink not supported in backend execution.
     default:
       stringstream error_msg;
       map<int, const char*>::const_iterator i =
           _TDataSinkType_VALUES_TO_NAMES.find(thrift_sink.type);
       const char* str = i != _TDataSinkType_VALUES_TO_NAMES.end() ?
-          i->second :  "Unknown data sink type ";
+          i->second :
+          "Unknown data sink type ";
       error_msg << str << " not implemented.";
       return Status(error_msg.str());
   }
-  RETURN_IF_ERROR((*sink)->Init(thrift_output_exprs, thrift_sink, state));
+  RETURN_IF_ERROR(data_sink->Init(thrift_sink, row_desc, state));
+  *sink = data_sink;
   return Status::OK();
 }
 
-Status DataSink::Init(const vector<TExpr>& thrift_output_exprs,
-    const TDataSink& tsink, RuntimeState* state) {
-  return ScalarExpr::Create(thrift_output_exprs, *row_desc_, state, &output_exprs_);
+// Empty string
+const char* const DataSink::ROOT_PARTITION_KEY = "";
+
+DataSink::DataSink(TDataSinkId sink_id, const DataSinkConfig& sink_config,
+    const string& name, RuntimeState* state)
+  : sink_config_(sink_config),
+    closed_(false),
+    row_desc_(sink_config.input_row_desc_),
+    name_(name),
+    output_exprs_(sink_config.output_exprs_) {
+  profile_ = RuntimeProfile::Create(state->obj_pool(), name);
+  if (sink_id != -1) {
+    // There is one sink per fragment so we can use the fragment index as a unique
+    // identifier.
+    profile_->SetDataSinkId(sink_id);
+  }
+}
+
+DataSink::~DataSink() {
+  DCHECK(closed_);
 }
 
 Status DataSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h
index 60968c4..1f06625 100644
--- a/be/src/exec/data-sink.h
+++ b/be/src/exec/data-sink.h
@@ -29,6 +29,7 @@
 
 namespace impala {
 
+class DataSink;
 class MemPool;
 class ObjectPool;
 class RowBatch;
@@ -43,6 +44,46 @@ class TPlanExecParams;
 class TPlanFragmentInstanceCtx;
 class TInsertStats;
 
+/// Configuration class for creating DataSink objects. It contains a subset of the static
+/// state of their corresponding DataSink, of which there is one instance per fragment.
+/// DataSink contains the runtime state and there can be up to MT_DOP instances of it per
+/// fragment.
+class DataSinkConfig {
+ public:
+  DataSinkConfig() = default;
+  virtual ~DataSinkConfig() {}
+
+  /// Create its corresponding DataSink. Place the sink in state->obj_pool().
+  virtual DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+    const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const = 0;
+
+  /// Pointer to the thrift data sink struct associated with this sink. Set in Init() and
+  /// owned by QueryState.
+  const TDataSink* tsink_ = nullptr;
+
+  /// The row descriptor for the rows consumed by the sink. Owned by root plan node of
+  /// plan tree, which feeds into this sink. Set in Init().
+  const RowDescriptor* input_row_desc_ = nullptr;
+
+  /// Output expressions to convert row batches onto output values.
+  /// Not used in some sub-classes.
+  std::vector<ScalarExpr*> output_exprs_;
+
+  /// Creates a new data sink config, allocated in state->obj_pool() and returned through
+  /// *sink, from the thrift sink object in fragment_ctx.
+  static Status CreateConfig(const TDataSink& thrift_sink, const RowDescriptor* row_desc,
+      RuntimeState* state, const DataSinkConfig** sink);
+
+ protected:
+  /// Sets reference to TDataSink and initializes the expressions. Returns error status on
+  /// failure. If overridden in subclass, must first call superclass's Init().
+  virtual Status Init(
+      const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state);
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(DataSinkConfig);
+};
+
 /// A data sink is an abstract interface for data sinks that consume RowBatches. E.g.
 /// a sink may write a HDFS table, send data across the network, or build hash tables
 /// for a join.
@@ -57,7 +98,7 @@ class DataSink {
   /// If this is the sink at the root of a fragment, 'sink_id' must be a unique ID for
   /// the sink for use in runtime profiles and other purposes. Otherwise this is a join
   /// build sink owned by an ExecNode and 'sink_id' must be -1.
-  DataSink(TDataSinkId sink_id, const RowDescriptor* row_desc, const string& name,
+  DataSink(TDataSinkId sink_id, const DataSinkConfig& sink_config, const string& name,
       RuntimeState* state);
   virtual ~DataSink();
 
@@ -87,12 +128,6 @@ class DataSink {
   /// Must be idempotent.
   virtual void Close(RuntimeState* state);
 
-  /// Creates a new data sink, allocated in pool and returned through *sink, from
-  /// thrift_sink.
-  static Status Create(const TPlanFragmentCtx& fragment_ctx,
-      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
-      const RowDescriptor* row_desc, RuntimeState* state, DataSink** sink);
-
   MemTracker* mem_tracker() const { return mem_tracker_.get(); }
   RuntimeProfile* profile() const { return profile_; }
   const std::vector<ScalarExprEvaluator*>& output_expr_evals() const {
@@ -104,6 +139,9 @@ class DataSink {
   static const char* const ROOT_PARTITION_KEY;
 
  protected:
+  /// Reference to the sink configuration shared across fragment instances.
+  const DataSinkConfig& sink_config_;
+
   /// Set to true after Close() has been called. Subclasses should check and set this in
   /// Close().
   bool closed_;
@@ -137,10 +175,6 @@ class DataSink {
   /// Not used in some sub-classes.
   std::vector<ScalarExpr*> output_exprs_;
   std::vector<ScalarExprEvaluator*> output_expr_evals_;
-
-  /// Initialize the expressions in the data sink and return error status on failure.
-  virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
-      const TDataSink& tsink, RuntimeState* state);
 };
 } // namespace impala
 #endif
diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc
index c907bcc..129ba25 100644
--- a/be/src/exec/hbase-table-sink.cc
+++ b/be/src/exec/hbase-table-sink.cc
@@ -31,15 +31,19 @@
 
 namespace impala {
 
-HBaseTableSink::HBaseTableSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
-    const TDataSink& tsink, RuntimeState* state)
-  : DataSink(sink_id, row_desc, "HBaseTableSink", state),
-    table_id_(tsink.table_sink.target_table_id),
-    table_desc_(NULL),
-    hbase_table_writer_(NULL) {
-  DCHECK(tsink.__isset.table_sink);
+DataSink* HBaseTableSinkConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
+    const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const {
+  TDataSinkId sink_id = fragment_ctx.fragment.idx;
+  return state->obj_pool()->Add(new HBaseTableSink(sink_id, *this, state));
 }
 
+HBaseTableSink::HBaseTableSink(
+      TDataSinkId sink_id, const DataSinkConfig& sink_config, RuntimeState* state)
+  : DataSink(sink_id, sink_config, "HBaseTableSink", state),
+    table_id_(sink_config.tsink_->table_sink.target_table_id),
+    table_desc_(NULL),
+    hbase_table_writer_(NULL) {}
+
 Status HBaseTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
   RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
   SCOPED_TIMER(profile()->total_time_counter());
diff --git a/be/src/exec/hbase-table-sink.h b/be/src/exec/hbase-table-sink.h
index d973ad4..4d702bd 100644
--- a/be/src/exec/hbase-table-sink.h
+++ b/be/src/exec/hbase-table-sink.h
@@ -31,12 +31,21 @@
 
 namespace impala {
 
+class HBaseTableSinkConfig : public DataSinkConfig {
+ public:
+  DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
+      RuntimeState* state) const override;
+
+  ~HBaseTableSinkConfig() override {}
+};
+
 /// Class to take row batches and send them to the HBaseTableWriter to
 /// eventually be written into an HBase table.
 class HBaseTableSink : public DataSink {
  public:
-  HBaseTableSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
-      const TDataSink& tsink, RuntimeState* state);
+  HBaseTableSink(
+      TDataSinkId sink_id, const DataSinkConfig& sink_config, RuntimeState* state);
   virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
   virtual Status Send(RuntimeState* state, RowBatch* batch);
   virtual Status FlushFinal(RuntimeState* state);
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 3e695b4..2674e72 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -53,23 +53,39 @@ using namespace strings;
 
 namespace impala {
 
-HdfsTableSink::HdfsTableSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
-    const TDataSink& tsink, RuntimeState* state)
-  : DataSink(sink_id, row_desc, "HdfsTableSink", state),
+Status HdfsTableSinkConfig::Init(
+    const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+  RETURN_IF_ERROR(DataSinkConfig::Init(tsink, input_row_desc, state));
+  DCHECK(tsink_->__isset.table_sink);
+  DCHECK(tsink_->table_sink.__isset.hdfs_table_sink);
+  RETURN_IF_ERROR(
+      ScalarExpr::Create(tsink_->table_sink.hdfs_table_sink.partition_key_exprs,
+          *input_row_desc_, state, &partition_key_exprs_));
+  return Status::OK();
+}
+
+DataSink* HdfsTableSinkConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
+    const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const {
+  TDataSinkId sink_id = fragment_ctx.fragment.idx;
+  return state->obj_pool()->Add(
+      new HdfsTableSink(sink_id, *this, this->tsink_->table_sink.hdfs_table_sink, state));
+}
+
+HdfsTableSink::HdfsTableSink(TDataSinkId sink_id, const HdfsTableSinkConfig& sink_config,
+    const THdfsTableSink& hdfs_sink, RuntimeState* state)
+  : DataSink(sink_id, sink_config, "HdfsTableSink", state),
     table_desc_(nullptr),
     prototype_partition_(nullptr),
-    table_id_(tsink.table_sink.target_table_id),
+    table_id_(sink_config.tsink_->table_sink.target_table_id),
     skip_header_line_count_(
-        tsink.table_sink.hdfs_table_sink.__isset.skip_header_line_count ?
-            tsink.table_sink.hdfs_table_sink.skip_header_line_count :
-            0),
-    overwrite_(tsink.table_sink.hdfs_table_sink.overwrite),
-    input_is_clustered_(tsink.table_sink.hdfs_table_sink.input_is_clustered),
-    sort_columns_(tsink.table_sink.hdfs_table_sink.sort_columns),
-    current_clustered_partition_(nullptr) {
-  DCHECK(tsink.__isset.table_sink);
-  if (tsink.table_sink.hdfs_table_sink.__isset.write_id) {
-    write_id_ = tsink.table_sink.hdfs_table_sink.write_id;
+        hdfs_sink.__isset.skip_header_line_count ? hdfs_sink.skip_header_line_count : 0),
+    overwrite_(hdfs_sink.overwrite),
+    input_is_clustered_(hdfs_sink.input_is_clustered),
+    sort_columns_(hdfs_sink.sort_columns),
+    current_clustered_partition_(nullptr),
+    partition_key_exprs_(sink_config.partition_key_exprs_) {
+  if (hdfs_sink.__isset.write_id) {
+    write_id_ = hdfs_sink.write_id;
     DCHECK_GT(write_id_, 0);
   }
 }
@@ -82,15 +98,6 @@ OutputPartition::OutputPartition()
     partition_descriptor(nullptr),
     block_size(0) {}
 
-Status HdfsTableSink::Init(const vector<TExpr>& thrift_output_exprs,
-    const TDataSink& tsink, RuntimeState* state) {
-  RETURN_IF_ERROR(DataSink::Init(thrift_output_exprs, tsink, state));
-  DCHECK(tsink.__isset.table_sink);
-  RETURN_IF_ERROR(ScalarExpr::Create(tsink.table_sink.hdfs_table_sink.partition_key_exprs,
-      *row_desc_, state, &partition_key_exprs_));
-  return Status::OK();
-}
-
 Status HdfsTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
   RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
   unique_id_str_ = PrintId(state->fragment_instance_id(), "-");
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index 81d5d40..2e790fd 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -94,6 +94,22 @@ struct OutputPartition {
   OutputPartition();
 };
 
+class HdfsTableSinkConfig : public DataSinkConfig {
+ public:
+  DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
+      RuntimeState* state) const override;
+
+  /// Expressions for computing the target partitions to which a row is written.
+  std::vector<ScalarExpr*> partition_key_exprs_;
+
+  ~HdfsTableSinkConfig() override {}
+
+ protected:
+  Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
+      RuntimeState* state) override;
+};
+
 /// The sink consumes all row batches of its child execution tree, and writes the
 /// evaluated output_exprs into temporary Hdfs files. The query coordinator moves the
 /// temporary files into their final locations after the sinks have finished executing.
@@ -134,8 +150,8 @@ struct OutputPartition {
 /// <table base dir>/<partition dirs>/<ACID base or delta directory>
 class HdfsTableSink : public DataSink {
  public:
-  HdfsTableSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
-      const TDataSink& tsink, RuntimeState* state);
+  HdfsTableSink(TDataSinkId sink_id, const HdfsTableSinkConfig& sink_config,
+    const THdfsTableSink& hdfs_sink, RuntimeState* state);
 
   /// Prepares output_exprs and partition_key_exprs, and connects to HDFS.
   virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
@@ -167,10 +183,6 @@ class HdfsTableSink : public DataSink {
 
   std::string DebugString() const;
 
- protected:
-  virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
-      const TDataSink& tsink, RuntimeState* state) WARN_UNUSED_RESULT;
-
  private:
   /// Initialises the filenames of a given output partition, and opens the temporary file.
   /// The partition key is derived from 'row'. If the partition will not have any rows
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index d4113e6..1372476 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -66,14 +66,20 @@ namespace impala {
 // Send 7MB buffers to Kudu, matching a hard-coded size in Kudu (KUDU-1693).
 const static int INDIVIDUAL_BUFFER_SIZE = 7 * 1024 * 1024;
 
-KuduTableSink::KuduTableSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
-    const TDataSink& tsink, RuntimeState* state)
-  : DataSink(sink_id, row_desc, "KuduTableSink", state),
-    table_id_(tsink.table_sink.target_table_id),
-    sink_action_(tsink.table_sink.action),
-    kudu_table_sink_(tsink.table_sink.kudu_table_sink),
+DataSink* KuduTableSinkConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
+    const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const {
+  TDataSinkId sink_id = fragment_ctx.fragment.idx;
+  return state->obj_pool()->Add(
+      new KuduTableSink(sink_id, *this, tsink_->table_sink, state));
+}
+
+KuduTableSink::KuduTableSink(TDataSinkId sink_id, const DataSinkConfig& sink_config,
+      const TTableSink& table_sink, RuntimeState* state)
+  : DataSink(sink_id, sink_config, "KuduTableSink", state),
+    table_id_(table_sink.target_table_id),
+    sink_action_(table_sink.action),
+    kudu_table_sink_(table_sink.kudu_table_sink),
     client_tracked_bytes_(0) {
-  DCHECK(tsink.__isset.table_sink);
   DCHECK(KuduIsAvailable());
 }
 
diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h
index 2496640..5cde878 100644
--- a/be/src/exec/kudu-table-sink.h
+++ b/be/src/exec/kudu-table-sink.h
@@ -30,6 +30,15 @@
 
 namespace impala {
 
+class KuduTableSinkConfig : public DataSinkConfig {
+ public:
+  DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
+      RuntimeState* state) const override;
+
+  ~KuduTableSinkConfig() override {}
+};
+
 /// Sink that takes RowBatches and writes them into a Kudu table.
 ///
 /// The data is added to Kudu in Send(). The Kudu client is configured to automatically
@@ -53,8 +62,8 @@ namespace impala {
 /// status. All reported errors (ignored or not) will be logged via the RuntimeState.
 class KuduTableSink : public DataSink {
  public:
-  KuduTableSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
-      const TDataSink& tsink, RuntimeState* state);
+  KuduTableSink(TDataSinkId sink_id, const DataSinkConfig& sink_config,
+      const TTableSink& table_sink, RuntimeState* state);
 
   /// Prepares the expressions to be applied and creates a KuduSchema based on the
   /// expressions and KuduTableDescriptor.
diff --git a/be/src/exec/nested-loop-join-builder.cc b/be/src/exec/nested-loop-join-builder.cc
index a521fa9..4932a67 100644
--- a/be/src/exec/nested-loop-join-builder.cc
+++ b/be/src/exec/nested-loop-join-builder.cc
@@ -27,9 +27,17 @@
 
 using namespace impala;
 
-NljBuilder::NljBuilder(const RowDescriptor* row_desc, RuntimeState* state)
-  : DataSink(-1, row_desc, "Nested Loop Join Builder", state),
-    build_batch_cache_(row_desc, state->batch_size()) {}
+NljBuilder* NljBuilder::CreateSink(const RowDescriptor* row_desc, RuntimeState* state) {
+  ObjectPool* pool = state->obj_pool();
+  DataSinkConfig* sink_config = pool->Add(new NljBuilderConfig());
+  sink_config->tsink_ = pool->Add(new TDataSink());
+  sink_config->input_row_desc_ = row_desc;
+  return pool->Add(new NljBuilder(*sink_config, state));
+}
+
+NljBuilder::NljBuilder(const DataSinkConfig& sink_config, RuntimeState* state)
+  : DataSink(-1, sink_config, "Nested Loop Join Builder", state),
+    build_batch_cache_(row_desc_, state->batch_size()) {}
 
 Status NljBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
   RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
diff --git a/be/src/exec/nested-loop-join-builder.h b/be/src/exec/nested-loop-join-builder.h
index 8a03d1f..bdc3eb0 100644
--- a/be/src/exec/nested-loop-join-builder.h
+++ b/be/src/exec/nested-loop-join-builder.h
@@ -26,6 +26,19 @@
 
 namespace impala {
 
+/// Dummy class needed to create an instance of the sink.
+class NljBuilderConfig : public DataSinkConfig {
+ public:
+  DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
+      RuntimeState* state) const override {
+    DCHECK(false) << "Not Implemented";
+    return nullptr;
+  }
+
+  ~NljBuilderConfig() override {}
+};
+
 /// Builder for the NestedLoopJoinNode that accumulates the build-side rows for the join.
 /// Implements the DataSink interface but also exposes some methods for direct use by
 /// NestedLoopJoinNode.
@@ -37,7 +50,9 @@ namespace impala {
 /// is used and all data is deep copied into memory owned by the builder.
 class NljBuilder : public DataSink {
  public:
-  NljBuilder(const RowDescriptor* row_desc, RuntimeState* state);
+
+  /// To be used by the NestedLoopJoinNode to create an instance of this sink.
+  static NljBuilder* CreateSink(const RowDescriptor* row_desc, RuntimeState* state);
 
   /// Implementations of DataSink interface methods.
   virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
@@ -76,6 +91,8 @@ class NljBuilder : public DataSink {
   inline RowBatchList* copied_build_batches() { return &copied_build_batches_; }
 
  private:
+  NljBuilder(const DataSinkConfig& sink_config, RuntimeState* state);
+
   /// Deep copy all build batches in 'input_build_batches_' to 'copied_build_batches_'.
   /// Resets all the source batches and clears 'input_build_batches_'.
   /// If the memory limit is exceeded while copying batches, returns a
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index 6be017e..c4d8379 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -95,8 +95,7 @@ Status NestedLoopJoinNode::Open(RuntimeState* state) {
     DCHECK_EQ(builder_->copied_build_batches()->total_num_rows(), 0);
     build_batches_ = builder_->input_build_batches();
   } else {
-    RETURN_IF_ERROR(
-        BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_.get()));
+    RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_));
     build_batches_ = builder_->GetFinalBuildBatches();
     if (matching_build_rows_ != NULL) {
       RETURN_IF_ERROR(ResetMatchingBuildRows(state, build_batches_->total_num_rows()));
@@ -112,7 +111,7 @@ Status NestedLoopJoinNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(BlockingJoinNode::Prepare(state));
   RETURN_IF_ERROR(ScalarExprEvaluator::Create(join_conjuncts_, state,
       pool_, expr_perm_pool(), expr_results_pool(), &join_conjunct_evals_));
-  builder_.reset(new NljBuilder(child(1)->row_desc(), state));
+  builder_ = NljBuilder::CreateSink(child(1)->row_desc(), state);
   RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker()));
   runtime_profile()->PrependChild(builder_->profile());
 
diff --git a/be/src/exec/nested-loop-join-node.h b/be/src/exec/nested-loop-join-node.h
index e3cf2b3..d117ff8 100644
--- a/be/src/exec/nested-loop-join-node.h
+++ b/be/src/exec/nested-loop-join-node.h
@@ -66,8 +66,8 @@ class NestedLoopJoinNode : public BlockingJoinNode {
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
 
-  /// The build side rows of the join.
-  boost::scoped_ptr<NljBuilder> builder_;
+  /// The build side rows of the join. Created in Prepare() and owned by runtime state.
+  NljBuilder* builder_;
 
   /// Pointer to the RowBatchList (owned by 'builder_') that contains the batches to
   /// use during the probe phase.
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index a865e8f..6df580d 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -49,29 +49,45 @@ static const string PREPARE_FOR_READ_FAILED_ERROR_MSG =
 using namespace impala;
 using strings::Substitute;
 
-const char* PhjBuilder::LLVM_CLASS_NAME = "class.impala::PhjBuilder";
+Status PhjBuilderConfig::Init(
+    const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+  return Status("Not Implemented.");
+}
 
-PhjBuilder::PhjBuilder(int join_node_id, const string& join_node_label,
-    TJoinOp::type join_op, const RowDescriptor* build_row_desc, RuntimeState* state,
-    BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size,
-    int64_t max_row_buffer_size)
-  : DataSink(-1, build_row_desc,
-        Substitute("Hash Join Builder (join_node_id=$0)", join_node_id), state),
-    runtime_state_(state),
-    join_node_id_(join_node_id),
-    join_node_label_(join_node_label),
-    join_op_(join_op),
-    buffer_pool_client_(buffer_pool_client),
-    spillable_buffer_size_(spillable_buffer_size),
-    max_row_buffer_size_(max_row_buffer_size) {}
+DataSink* PhjBuilderConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
+    const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const {
+  DCHECK(false) << "Not Implemented";
+  return nullptr;
+}
+
+PhjBuilder* PhjBuilderConfig::CreateSink(BufferPool::ClientHandle* buffer_pool_client,
+    const std::string& join_node_label, int64_t spillable_buffer_size,
+    int64_t max_row_buffer_size, RuntimeState* state) const {
+  ObjectPool* pool = state->obj_pool();
+  return pool->Add(new PhjBuilder(*this, buffer_pool_client, join_node_label,
+      spillable_buffer_size, max_row_buffer_size, state));
+}
+
+Status PhjBuilderConfig::CreateConfig(RuntimeState* state, int join_node_id,
+    TJoinOp::type join_op, const RowDescriptor* build_row_desc,
+    const std::vector<TEqJoinCondition>& eq_join_conjuncts,
+    const std::vector<TRuntimeFilterDesc>& filters, const PhjBuilderConfig** sink) {
+  ObjectPool* pool = state->obj_pool();
+  TDataSink* tsink = pool->Add(new TDataSink()); // just a dummy object.
+  PhjBuilderConfig* data_sink = pool->Add(new PhjBuilderConfig());
+  RETURN_IF_ERROR(data_sink->Init(
+      state, *tsink, join_node_id, join_op, build_row_desc, eq_join_conjuncts, filters));
+  *sink = data_sink;
+  return Status::OK();
+}
 
-Status PhjBuilder::InitExprsAndFilters(RuntimeState* state,
+Status PhjBuilderConfig::InitExprsAndFilters(RuntimeState* state,
     const vector<TEqJoinCondition>& eq_join_conjuncts,
     const vector<TRuntimeFilterDesc>& filter_descs) {
   for (const TEqJoinCondition& eq_join_conjunct : eq_join_conjuncts) {
     ScalarExpr* build_expr;
     RETURN_IF_ERROR(
-        ScalarExpr::Create(eq_join_conjunct.right, *row_desc_, state, &build_expr));
+        ScalarExpr::Create(eq_join_conjunct.right, *input_row_desc_, state, &build_expr));
     build_exprs_.push_back(build_expr);
     is_not_distinct_from_.push_back(eq_join_conjunct.is_not_distinct_from);
   }
@@ -92,15 +108,45 @@ Status PhjBuilder::InitExprsAndFilters(RuntimeState* state,
     if (it == filters_produced.end()) continue;
     ScalarExpr* filter_expr;
     RETURN_IF_ERROR(
-        ScalarExpr::Create(filter_desc.src_expr, *row_desc_, state, &filter_expr));
+        ScalarExpr::Create(filter_desc.src_expr, *input_row_desc_, state, &filter_expr));
     filter_exprs_.push_back(filter_expr);
+    filter_descs_.push_back(filter_desc);
+  }
+  return Status::OK();
+}
+
+Status PhjBuilderConfig::Init(RuntimeState* state, const TDataSink& tsink,
+    int join_node_id, TJoinOp::type join_op, const RowDescriptor* build_row_desc,
+    const std::vector<TEqJoinCondition>& eq_join_conjuncts,
+    const std::vector<TRuntimeFilterDesc>& filters) {
+  RETURN_IF_ERROR(DataSinkConfig::Init(tsink, build_row_desc, state));
+  join_node_id_ = join_node_id;
+  join_op_ = join_op;
+  return InitExprsAndFilters(state, eq_join_conjuncts, filters);
+}
+
+const char* PhjBuilder::LLVM_CLASS_NAME = "class.impala::PhjBuilder";
 
-    // TODO: Move to Prepare().
+PhjBuilder::PhjBuilder(const PhjBuilderConfig& sink_config,
+    BufferPool::ClientHandle* buffer_pool_client, const std::string& join_node_label,
+    int64_t spillable_buffer_size, int64_t max_row_buffer_size, RuntimeState* state)
+  : DataSink(-1, sink_config,
+        Substitute("Hash Join Builder (join_node_id=$0)", sink_config.join_node_id_),
+        state),
+    runtime_state_(state),
+    join_node_id_(sink_config.join_node_id_),
+    join_node_label_(join_node_label),
+    join_op_(sink_config.join_op_),
+    buffer_pool_client_(buffer_pool_client),
+    spillable_buffer_size_(spillable_buffer_size),
+    max_row_buffer_size_(max_row_buffer_size),
+    build_exprs_(sink_config.build_exprs_),
+    is_not_distinct_from_(sink_config.is_not_distinct_from_),
+    filter_exprs_(sink_config.filter_exprs_) {
+  for (const TRuntimeFilterDesc& filter_desc : sink_config.filter_descs_) {
     filter_ctxs_.emplace_back();
-    // TODO: IMPALA-4400 - implement local aggregation of runtime filters.
     filter_ctxs_.back().filter = state->filter_bank()->RegisterFilter(filter_desc, true);
   }
-  return Status::OK();
 }
 
 Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index 9ac23b1..0dec0b8 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -35,11 +35,76 @@
 
 namespace impala {
 
+class PhjBuilder;
 class RowDescriptor;
 class RuntimeState;
 class ScalarExpr;
 class ScalarExprEvaluator;
 
+/// Partitioned Hash Join Builder Config class. This has a few extra methods to be used
+/// directly by the PartitionedHashJoinPlanNode. Since it is expected to only be created
+/// and used by PartitionedHashJoinPlanNode only, the DataSinkConfig::Init() and
+/// DataSinkConfig::CreateSink() are not implemented for it.
+class PhjBuilderConfig : public DataSinkConfig {
+ public:
+    DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
+      RuntimeState* state) const override;
+
+  /// Creates an instance of PhjBuilder data sink in the state's object pool. To be used
+  /// only by PartitionedHashJoinPlanNode.
+  PhjBuilder* CreateSink(BufferPool::ClientHandle* buffer_pool_client,
+      const std::string& join_node_label, int64_t spillable_buffer_size,
+      int64_t max_row_buffer_size, RuntimeState* state) const;
+
+  /// Creates an instance of this class in the state's object pool. To be used only by
+  /// PartitionedHashJoinPlanNode.
+  static Status CreateConfig(RuntimeState* state, int join_node_id, TJoinOp::type join_op,
+      const RowDescriptor* build_row_desc,
+      const std::vector<TEqJoinCondition>& eq_join_conjuncts,
+      const std::vector<TRuntimeFilterDesc>& filters, const PhjBuilderConfig** sink);
+
+  ~PhjBuilderConfig() override {}
+
+  /// The ID of the plan join node this is associated with.
+  int join_node_id_;
+
+  /// The join operation this is building for.
+  TJoinOp::type join_op_;
+
+  /// Expressions over input rows for hash table build.
+  std::vector<ScalarExpr*> build_exprs_;
+
+  /// is_not_distinct_from_[i] is true if and only if the ith equi-join predicate is IS
+  /// NOT DISTINCT FROM, rather than equality.
+  /// Set in InitExprsAndFilters() and constant thereafter.
+  std::vector<bool> is_not_distinct_from_;
+
+  /// Expressions for evaluating input rows for insertion into runtime filters.
+  /// Only includes exprs for filters produced by this builder.
+  std::vector<ScalarExpr*> filter_exprs_;
+
+  /// The runtime filter descriptors of filters produced by this builder.
+  vector<TRuntimeFilterDesc> filter_descs_;
+
+ protected:
+  Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
+      RuntimeState* state) override;
+
+ private:
+  /// Helper method used by CreateConfig()
+  Status Init(RuntimeState* state, const TDataSink& tsink, int join_node_id,
+      TJoinOp::type join_op, const RowDescriptor* build_row_desc,
+      const std::vector<TEqJoinCondition>& eq_join_conjuncts,
+      const std::vector<TRuntimeFilterDesc>& filters);
+
+  /// Initializes the build and filter expressions and creates a copy of the filter
+  /// descriptors that will be generated by this sink.
+  Status InitExprsAndFilters(RuntimeState* state,
+      const std::vector<TEqJoinCondition>& eq_join_conjuncts,
+      const std::vector<TRuntimeFilterDesc>& filters);
+};
+
 /// See partitioned-hash-join-node.h for explanation of the top-level algorithm and how
 /// these states fit in it.
 enum class HashJoinState {
@@ -114,14 +179,9 @@ class PhjBuilder : public DataSink {
 
   using PartitionId = int;
 
-  PhjBuilder(int join_node_id, const std::string& join_node_label, TJoinOp::type join_op,
-      const RowDescriptor* build_row_desc, RuntimeState* state,
-      BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size,
-      int64_t max_row_buffer_size);
-
-  Status InitExprsAndFilters(RuntimeState* state,
-      const std::vector<TEqJoinCondition>& eq_join_conjuncts,
-      const std::vector<TRuntimeFilterDesc>& filters) WARN_UNUSED_RESULT;
+  PhjBuilder(const PhjBuilderConfig& sink_config,
+      BufferPool::ClientHandle* buffer_pool_client, const std::string& join_node_label,
+      int64_t spillable_buffer_size, int64_t max_row_buffer_size, RuntimeState* state);
 
   /// Implementations of DataSink interface methods.
   virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 737a963..a9fe4e9 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -76,24 +76,34 @@ Status PartitionedHashJoinPlanNode::Init(const TPlanNode& tnode, RuntimeState* s
       full_row_desc, state, &other_join_conjuncts_));
   DCHECK(tnode.hash_join_node.join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN
       || eq_join_conjuncts.size() == 1);
+
+  RETURN_IF_ERROR(PhjBuilderConfig::CreateConfig(state, tnode_->node_id,
+      tnode_->hash_join_node.join_op, children_[1]->row_descriptor_, eq_join_conjuncts,
+      tnode_->runtime_filters, &phj_builder_config));
   return Status::OK();
 }
 
 Status PartitionedHashJoinPlanNode::CreateExecNode(
     RuntimeState* state, ExecNode** node) const {
   ObjectPool* pool = state->obj_pool();
-  *node = pool->Add(new PartitionedHashJoinNode(pool, *this, state->desc_tbl()));
+  *node = pool->Add(new PartitionedHashJoinNode(state, *this, state->desc_tbl()));
   return Status::OK();
 }
 
-PartitionedHashJoinNode::PartitionedHashJoinNode(ObjectPool* pool,
+PartitionedHashJoinNode::PartitionedHashJoinNode(RuntimeState* state,
     const PartitionedHashJoinPlanNode& pnode, const DescriptorTbl& descs)
   : BlockingJoinNode("PartitionedHashJoinNode", pnode.tnode_->hash_join_node.join_op,
-        pool, pnode, descs) {
+        state->obj_pool(), pnode, descs),
+    build_exprs_(pnode.build_exprs_),
+    probe_exprs_(pnode.probe_exprs_),
+    other_join_conjuncts_(pnode.other_join_conjuncts_) {
   memset(hash_tbls_, 0, sizeof(HashTable*) * PARTITION_FANOUT);
-  build_exprs_ = pnode.build_exprs_;
-  probe_exprs_ = pnode.probe_exprs_;
-  other_join_conjuncts_ = pnode.other_join_conjuncts_;
+  // TODO: allow PhjBuilder to be the sink of a separate fragment. For now, PhjBuilder is
+  // owned by this node, but duplicates some state (exprs, etc) in anticipation of it
+  // being separated out further.
+  builder_ = pnode.phj_builder_config->CreateSink(buffer_pool_client(), label(),
+      resource_profile_.spillable_buffer_size, resource_profile_.max_row_buffer_size,
+      state);
 }
 
 PartitionedHashJoinNode::~PartitionedHashJoinNode() {
@@ -106,21 +116,6 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
 
   RETURN_IF_ERROR(BlockingJoinNode::Prepare(state));
   runtime_state_ = state;
-  const vector<TEqJoinCondition>& eq_join_conjuncts =
-      plan_node_.tnode_->hash_join_node.eq_join_conjuncts;
-  // TODO: allow PhjBuilder to be the sink of a separate fragment. For now, PhjBuilder is
-  // owned by this node, but duplicates some state (exprs, etc) in anticipation of it
-  // being separated out further.
-  // TODO: Move the builder creation on Expr Init into the constructor once the PlanRoot
-  // Equivalent of a sink is implemented. build_exprs_ and filter_exprs can be passed
-  // directly from those generated in Phj node, only thing left to do is register the
-  // filters.
-  builder_.reset(new PhjBuilder(id(), label(), join_op_, child(1)->row_desc(), state,
-      buffer_pool_client(), resource_profile_.spillable_buffer_size,
-      resource_profile_.max_row_buffer_size));
-  RETURN_IF_ERROR(builder_->InitExprsAndFilters(
-      state, eq_join_conjuncts, plan_node_.tnode_->runtime_filters));
-
   RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker()));
   runtime_profile()->PrependChild(builder_->profile());
 
@@ -186,7 +181,7 @@ Status PartitionedHashJoinNode::Open(RuntimeState* state) {
   // are cleared in QueryMaintenance().
   probe_expr_results_pool_->Clear();
 
-  RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_.get()));
+  RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_));
 
   build_hash_partitions_ = builder_->BeginInitialProbe(buffer_pool_client());
   RETURN_IF_ERROR(PrepareForPartitionedProbe());
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index 38c274f..9c6f16b 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -52,6 +52,10 @@ class PartitionedHashJoinPlanNode : public BlockingJoinPlanNode {
 
   /// Non-equi-join conjuncts from the ON clause.
   std::vector<ScalarExpr*> other_join_conjuncts_;
+
+  /// Data sink config object for creating a phj builder that will be eventually used by
+  /// the exec node.
+  const PhjBuilderConfig* phj_builder_config;
 };
 
 /// Operator to perform partitioned hash join, spilling to disk as necessary. This
@@ -128,7 +132,7 @@ class PartitionedHashJoinPlanNode : public BlockingJoinPlanNode {
 
 class PartitionedHashJoinNode : public BlockingJoinNode {
  public:
-  PartitionedHashJoinNode(ObjectPool* pool, const PartitionedHashJoinPlanNode& pnode,
+  PartitionedHashJoinNode(RuntimeState* state, const PartitionedHashJoinPlanNode& pnode,
       const DescriptorTbl& descs);
   virtual ~PartitionedHashJoinNode();
 
@@ -552,8 +556,8 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// State of the probing algorithm. Used to drive the state machine in GetNext().
   ProbeState probe_state_ = ProbeState::PROBE_COMPLETE;
 
-  /// The build-side of the join. Initialized in Prepare().
-  boost::scoped_ptr<PhjBuilder> builder_;
+  /// The build-side of the join. Initialized in constructor and owned by runtime state.
+  PhjBuilder* builder_;
 
   /// Last set of hash partitions obtained from builder_. Only valid when the
   /// builder's state is PARTITIONING_PROBE or REPARTITIONING_PROBE.
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index 45368f9..bce6324 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -17,6 +17,8 @@
 
 #include "exec/plan-root-sink.h"
 
+#include "exec/buffered-plan-root-sink.h"
+#include "exec/blocking-plan-root-sink.h"
 #include "exprs/scalar-expr-evaluator.h"
 #include "exprs/scalar-expr.h"
 #include "runtime/row-batch.h"
@@ -33,9 +35,21 @@ using boost::mutex;
 
 namespace impala {
 
+DataSink* PlanRootSinkConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
+    const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const {
+  TDataSinkId sink_id = fragment_ctx.fragment.idx;
+  ObjectPool* pool = state->obj_pool();
+  if (state->query_options().spool_query_results) {
+    return pool->Add(new BufferedPlanRootSink(
+        sink_id, *this, state, fragment_instance_ctx.debug_options));
+  } else {
+    return pool->Add(new BlockingPlanRootSink(sink_id, *this, state));
+  }
+}
+
 PlanRootSink::PlanRootSink(
-    TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state)
-  : DataSink(sink_id, row_desc, "PLAN_ROOT_SINK", state),
+    TDataSinkId sink_id, const DataSinkConfig& sink_config, RuntimeState* state)
+  : DataSink(sink_id, sink_config, "PLAN_ROOT_SINK", state),
     num_rows_produced_limit_(state->query_options().num_rows_produced_limit) {}
 
 PlanRootSink::~PlanRootSink() {}
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
index ad4dc3e..ecc03aa 100644
--- a/be/src/exec/plan-root-sink.h
+++ b/be/src/exec/plan-root-sink.h
@@ -27,6 +27,15 @@ class RowBatch;
 class QueryResultSet;
 class ScalarExprEvaluator;
 
+class PlanRootSinkConfig : public DataSinkConfig {
+ public:
+  DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
+      RuntimeState* state) const override;
+
+  ~PlanRootSinkConfig() override {}
+};
+
 /// Sink which manages the handoff between a 'sender' (a fragment instance) that produces
 /// batches by calling Send(), and a 'consumer' (e.g. the coordinator) which consumes rows
 /// formed by computing a set of output expressions over the input batches, by calling
@@ -54,7 +63,8 @@ class ScalarExprEvaluator;
 /// ensures that this outlives any calls to Send() and GetNext(), respectively.
 class PlanRootSink : public DataSink {
  public:
-  PlanRootSink(TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state);
+  PlanRootSink(
+    TDataSinkId sink_id, const DataSinkConfig& sink_config, RuntimeState* state);
   virtual ~PlanRootSink();
 
   /// Called before Send(), Open(), or Close(). Performs any additional setup necessary,
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 3121ee9..90bbf48 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -526,20 +526,16 @@ class DataStreamTest : public testing::Test {
     RuntimeState state(TQueryCtx(), exec_env_.get(), desc_tbl_);
     VLOG_QUERY << "create sender " << sender_num;
     const TDataSink& sink = GetSink(partition_type);
+    const DataSinkConfig* data_sink = nullptr;
+    EXPECT_OK(DataSinkConfig::CreateConfig(sink, row_desc_, &state, &data_sink));
 
     // We create an object of the base class DataSink and cast to the appropriate sender
     // according to the 'is_thrift' option.
     scoped_ptr<DataSink> sender;
 
-    TExprNode expr_node;
-    expr_node.node_type = TExprNodeType::SLOT_REF;
-    TExpr output_exprs;
-    output_exprs.nodes.push_back(expr_node);
-
-    sender.reset(new KrpcDataStreamSender(-1,
-        sender_num, row_desc_, sink.stream_sink, dest_, channel_buffer_size, &state));
-    EXPECT_OK(static_cast<KrpcDataStreamSender*>(
-        sender.get())->Init(vector<TExpr>({output_exprs}), sink, &state));
+    sender.reset(new KrpcDataStreamSender(-1, sender_num,
+        *(static_cast<const KrpcDataStreamSenderConfig*>(data_sink)),
+        data_sink->tsink_->stream_sink, dest_, channel_buffer_size, &state));
     EXPECT_OK(sender->Prepare(&state, &tracker_));
     EXPECT_OK(sender->Open(&state));
     scoped_ptr<RowBatch> batch(CreateRowBatch());
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 02bc9cf..bc34d17 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -215,8 +215,10 @@ Status FragmentInstanceState::Prepare() {
 
   // prepare sink_
   DCHECK(fragment_ctx_.fragment.__isset.output_sink);
-  RETURN_IF_ERROR(DataSink::Create(fragment_ctx_, instance_ctx_, exec_tree_->row_desc(),
-      runtime_state_, &sink_));
+  const TDataSink& thrift_sink = fragment_ctx_.fragment.output_sink;
+  RETURN_IF_ERROR(DataSinkConfig::CreateConfig(
+      thrift_sink, plan_tree_->row_descriptor_, runtime_state_, &sink_config_));
+  sink_ = sink_config_->CreateSink(fragment_ctx_, instance_ctx_, runtime_state_);
   RETURN_IF_ERROR(sink_->Prepare(runtime_state_, runtime_state_->instance_mem_tracker()));
   RuntimeProfile* sink_profile = sink_->profile();
   if (sink_profile != nullptr) profile()->AddChild(sink_profile);
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index 7b059c6..69e081b 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -51,6 +51,7 @@ class PlanNode;
 class PlanRootSink;
 class Thread;
 class DataSink;
+class DataSinkConfig;
 class RuntimeState;
 
 /// FragmentInstanceState handles all aspects of the execution of a single plan fragment
@@ -154,6 +155,7 @@ class FragmentInstanceState {
   RuntimeState* runtime_state_ = nullptr;  // lives in obj_pool()
   /// Lives in obj_pool(). Not mutated after being initialized.
   const PlanNode* plan_tree_ = nullptr;
+  const DataSinkConfig* sink_config_ = nullptr;
 
   /// A 'fake mutex' to detect any race condition in accessing 'report_seq_no_' below.
   /// There should be only one thread doing status report at the same time.
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index f1a9951..91cac2b 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -55,6 +55,10 @@
 
 #include "common/names.h"
 
+DEFINE_int64(data_stream_sender_buffer_size, 16 * 1024,
+    "(Advanced) Max size in bytes which a row batch in a data stream sender's channel "
+    "can accumulate before the row batch is sent over the wire.");
+
 using std::condition_variable_any;
 using namespace apache::thrift;
 using kudu::rpc::RpcController;
@@ -71,6 +75,29 @@ const char* KrpcDataStreamSender::HASH_ROW_SYMBOL =
 const char* KrpcDataStreamSender::LLVM_CLASS_NAME = "class.impala::KrpcDataStreamSender";
 const char* KrpcDataStreamSender::TOTAL_BYTES_SENT_COUNTER = "TotalBytesSent";
 
+Status KrpcDataStreamSenderConfig::Init(
+    const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+  RETURN_IF_ERROR(DataSinkConfig::Init(tsink, input_row_desc, state));
+  DCHECK(tsink_->__isset.stream_sink);
+  auto& partition_type = tsink_->stream_sink.output_partition.type;
+  if (partition_type == TPartitionType::HASH_PARTITIONED
+      || partition_type == TPartitionType::KUDU) {
+    RETURN_IF_ERROR(
+        ScalarExpr::Create(tsink_->stream_sink.output_partition.partition_exprs,
+            *input_row_desc_, state, &partition_exprs_));
+  }
+  return Status::OK();
+}
+
+DataSink* KrpcDataStreamSenderConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
+    const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const {
+  // We have one fragment per sink, so we can use the fragment index as the sink ID.
+  TDataSinkId sink_id = fragment_ctx.fragment.idx;
+  return state->obj_pool()->Add(new KrpcDataStreamSender(sink_id,
+      fragment_instance_ctx.sender_id, *this, tsink_->stream_sink,
+      fragment_ctx.destinations, FLAGS_data_stream_sender_buffer_size, state));
+}
+
 // A datastream sender may send row batches to multiple destinations. There is one
 // channel for each destination.
 //
@@ -669,14 +696,15 @@ void KrpcDataStreamSender::Channel::Teardown(RuntimeState* state) {
 }
 
 KrpcDataStreamSender::KrpcDataStreamSender(TDataSinkId sink_id, int sender_id,
-    const RowDescriptor* row_desc, const TDataStreamSink& sink,
-    const vector<TPlanFragmentDestination>& destinations, int per_channel_buffer_size,
-    RuntimeState* state)
-  : DataSink(sink_id, row_desc,
+    const KrpcDataStreamSenderConfig& sink_config, const TDataStreamSink& sink,
+    const std::vector<TPlanFragmentDestination>& destinations,
+    int per_channel_buffer_size, RuntimeState* state)
+  : DataSink(sink_id, sink_config,
         Substitute("KrpcDataStreamSender (dst_id=$0)", sink.dest_node_id), state),
     sender_id_(sender_id),
     partition_type_(sink.output_partition.type),
     per_channel_buffer_size_(per_channel_buffer_size),
+    partition_exprs_(sink_config.partition_exprs_),
     dest_node_id_(sink.dest_node_id),
     next_unknown_partition_(0) {
   DCHECK_GT(destinations.size(), 0);
@@ -687,16 +715,17 @@ KrpcDataStreamSender::KrpcDataStreamSender(TDataSinkId sink_id, int sender_id,
 
   for (int i = 0; i < destinations.size(); ++i) {
     channels_.push_back(
-        new Channel(this, row_desc, destinations[i].thrift_backend.hostname,
+        new Channel(this, row_desc_, destinations[i].thrift_backend.hostname,
             destinations[i].krpc_backend, destinations[i].fragment_instance_id,
             sink.dest_node_id, per_channel_buffer_size));
   }
 
-  if (partition_type_ == TPartitionType::UNPARTITIONED ||
-      partition_type_ == TPartitionType::RANDOM) {
+  if (partition_type_ == TPartitionType::UNPARTITIONED
+      || partition_type_ == TPartitionType::RANDOM) {
     // Randomize the order we open/transmit to channels to avoid thundering herd problems.
     random_shuffle(channels_.begin(), channels_.end());
   }
+
 }
 
 KrpcDataStreamSender::~KrpcDataStreamSender() {
@@ -707,18 +736,6 @@ KrpcDataStreamSender::~KrpcDataStreamSender() {
   }
 }
 
-Status KrpcDataStreamSender::Init(const vector<TExpr>& thrift_output_exprs,
-    const TDataSink& tsink, RuntimeState* state) {
-  SCOPED_TIMER(profile_->total_time_counter());
-  DCHECK(tsink.__isset.stream_sink);
-  if (partition_type_ == TPartitionType::HASH_PARTITIONED ||
-      partition_type_ == TPartitionType::KUDU) {
-    RETURN_IF_ERROR(ScalarExpr::Create(tsink.stream_sink.output_partition.partition_exprs,
-        *row_desc_, state, &partition_exprs_));
-  }
-  return Status::OK();
-}
-
 Status KrpcDataStreamSender::Prepare(
     RuntimeState* state, MemTracker* parent_mem_tracker) {
   RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h
index d0f9848..0e5d6d6 100644
--- a/be/src/runtime/krpc-data-stream-sender.h
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -39,6 +39,23 @@ class TDataStreamSink;
 class TNetworkAddress;
 class TPlanFragmentDestination;
 
+class KrpcDataStreamSenderConfig : public DataSinkConfig {
+ public:
+  DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
+      RuntimeState* state) const override;
+
+  /// Expressions of partition keys. It's used to compute the
+  /// per-row partition values for shuffling exchange;
+  std::vector<ScalarExpr*> partition_exprs_;
+
+  ~KrpcDataStreamSenderConfig() override {}
+
+ protected:
+  Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
+      RuntimeState* state) override;
+};
+
 /// Single sender of an m:n data stream.
 ///
 /// Row batch data is routed to destinations based on the provided partitioning
@@ -47,21 +64,19 @@ class TPlanFragmentDestination;
 ///
 /// TODO: capture stats that describe distribution of rows/data volume
 /// across channels.
-/// TODO: create a PlanNode equivalent class for DataSink.
 class KrpcDataStreamSender : public DataSink {
  public:
-  /// Constructs a sender according to the output specification (tsink), sending to the
+  /// Constructs a sender according to the config (sink_config), sending to the
   /// given destinations:
   /// 'sender_id' identifies this sender instance, and is unique within a fragment.
-  /// 'row_desc' is the descriptor of the tuple row. It must out-live the sink.
   /// 'destinations' are the receivers' network addresses. There is one channel for each
   /// destination.
   /// 'per_channel_buffer_size' is the soft limit in bytes of the buffering into the
   /// per-channel's accumulating row batch before it will be sent.
   /// NOTE: supported partition types are UNPARTITIONED (broadcast), HASH_PARTITIONED,
   /// and RANDOM.
-  KrpcDataStreamSender(TDataSinkId sink_id, int sender_id, const RowDescriptor* row_desc,
-      const TDataStreamSink& tsink,
+  KrpcDataStreamSender(TDataSinkId sink_id, int sender_id,
+      const KrpcDataStreamSenderConfig& sink_config, const TDataStreamSink& sink,
       const std::vector<TPlanFragmentDestination>& destinations,
       int per_channel_buffer_size, RuntimeState* state);
 
@@ -102,11 +117,6 @@ class KrpcDataStreamSender : public DataSink {
  protected:
   friend class DataStreamTest;
 
-  /// Initializes any partitioning expressions based on 'thrift_output_exprs' and stores
-  /// them in 'partition_exprs_'. Returns error status if the initialization failed.
-  virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
-      const TDataSink& tsink, RuntimeState* state) override;
-
   /// Returns total number of bytes sent. If batches are broadcast to multiple receivers,
   /// they are counted once per receiver.
   int64_t GetNumDataBytesSent() const;


[impala] 01/02: IMPALA-9035: Simplify casting string to timestamp.

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6e3d1216f1680e7fca4b5979f182a19673018a2f
Author: xuzhou <pe...@gmail.com>
AuthorDate: Thu Dec 12 17:19:26 2019 +0800

    IMPALA-9035: Simplify casting string to timestamp.
    
    This change will help with queries generated by some BI tools.
    Case 1:
    Simplify 'string -> bigint -> timestamp' TO 'string -> timestamp':
    cast(unix_timestamp('timestr') as timestamp) ->
    cast('timestr' as timestamp)
    Case 2:
    Simplify 'string[fmt] -> bigint -> timestamp' TO 'string -> timestamp':
    cast(unix_timestamp('timestr', 'fmt') as timestamp) ->
    to_timestamp('timestr', 'fmt')
    
    Tests:
    Add front-end tests in ExprRewriteRulesTest.
    
    Change-Id: I4ed72d6e7886eaf50d2be60cf45170ffaef5e72d
    Reviewed-on: http://gerrit.cloudera.org:8080/14896
    Reviewed-by: Attila Jeges <at...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../java/org/apache/impala/analysis/Analyzer.java  |  2 +
 .../rewrite/SimplifyCastStringToTimestamp.java     | 77 ++++++++++++++++++++++
 .../impala/analysis/ExprRewriteRulesTest.java      | 16 +++++
 3 files changed, 95 insertions(+)

diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 1ef542b..fd85322 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -71,6 +71,7 @@ import org.apache.impala.rewrite.FoldConstantsRule;
 import org.apache.impala.rewrite.NormalizeBinaryPredicatesRule;
 import org.apache.impala.rewrite.NormalizeCountStarRule;
 import org.apache.impala.rewrite.NormalizeExprsRule;
+import org.apache.impala.rewrite.SimplifyCastStringToTimestamp;
 import org.apache.impala.rewrite.SimplifyConditionalsRule;
 import org.apache.impala.rewrite.SimplifyDistinctFromRule;
 import org.apache.impala.service.FeSupport;
@@ -483,6 +484,7 @@ public class Analyzer {
         rules.add(EqualityDisjunctsToInRule.INSTANCE);
         rules.add(NormalizeCountStarRule.INSTANCE);
         rules.add(SimplifyDistinctFromRule.INSTANCE);
+        rules.add(SimplifyCastStringToTimestamp.INSTANCE);
       }
       exprRewriter_ = new ExprRewriter(rules);
     }
diff --git a/fe/src/main/java/org/apache/impala/rewrite/SimplifyCastStringToTimestamp.java b/fe/src/main/java/org/apache/impala/rewrite/SimplifyCastStringToTimestamp.java
new file mode 100644
index 0000000..af6ffd8
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/rewrite/SimplifyCastStringToTimestamp.java
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.rewrite;
+
+import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.analysis.CastExpr;
+import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.FunctionCallExpr;
+import org.apache.impala.analysis.FunctionName;
+import org.apache.impala.analysis.TypeDef;
+import org.apache.impala.common.AnalysisException;
+import com.google.common.collect.Lists;
+
+
+/**
+ ** Case 1:
+ ** Simplify 'string -> bigint -> timestamp' TO 'string -> timestamp':
+ ** cast(unix_timestamp('timestr') as timestamp) -> cast('timestr' as timestamp)
+ **
+ ** Case 2:
+ ** Simplify 'string[fmt] -> bigint -> timestamp' TO 'string -> timestamp':
+ ** cast(unix_timestamp('timestr', 'fmt') as timestamp) -> to_timestamp('timestr', 'fmt')
+ **/
+
+public class SimplifyCastStringToTimestamp implements ExprRewriteRule {
+  public static ExprRewriteRule INSTANCE = new SimplifyCastStringToTimestamp();
+
+  @Override
+  public Expr apply(Expr expr, Analyzer analyzer) throws AnalysisException {
+    if (expr instanceof CastExpr &&
+        !((CastExpr)expr).isImplicit() &&
+        expr.getChild(0) instanceof FunctionCallExpr) {
+      if (!expr.isAnalyzed())
+        expr.analyze(analyzer);
+
+      FunctionCallExpr fce = (FunctionCallExpr)expr.getChild(0);
+      if (!expr.getType().isTimestamp() ||
+          !fce.getFnName().getFunction().equalsIgnoreCase("unix_timestamp"))
+        return expr;
+
+      Expr simplifiedExpr = null;
+      if (fce.getChildren().size() == 1 &&
+          fce.getChild(0).getType().isStringType()) {
+        // Handle Case 1
+        simplifiedExpr = new CastExpr(new TypeDef(expr.getType()), fce.getChild(0));
+      } else if (fce.getChildren().size() == 2 &&
+                 fce.getChild(0).getType().isStringType() &&
+                 fce.getChild(1).getType().isStringType()) {
+        // Handle Case 2
+        simplifiedExpr = new FunctionCallExpr(new FunctionName("to_timestamp"),
+            Lists.newArrayList(fce.getChildren()));
+      }
+
+      if (simplifiedExpr != null) {
+        simplifiedExpr.analyze(analyzer);
+        return simplifiedExpr;
+      }
+    }
+
+    return expr;
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java b/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java
index 40e6b5b..0635210 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java
@@ -40,6 +40,7 @@ import org.apache.impala.rewrite.FoldConstantsRule;
 import org.apache.impala.rewrite.NormalizeBinaryPredicatesRule;
 import org.apache.impala.rewrite.NormalizeCountStarRule;
 import org.apache.impala.rewrite.NormalizeExprsRule;
+import org.apache.impala.rewrite.SimplifyCastStringToTimestamp;
 import org.apache.impala.rewrite.SimplifyConditionalsRule;
 import org.apache.impala.rewrite.SimplifyDistinctFromRule;
 import org.junit.BeforeClass;
@@ -770,6 +771,21 @@ public class ExprRewriteRulesTest extends FrontendTestBase {
     RewritesOk("if(bool_col <=> NULL, 1, 2)", rules, null);
   }
 
+  @Test
+  public void testSimplifyCastStringToTimestamp() throws ImpalaException {
+    ExprRewriteRule rule = SimplifyCastStringToTimestamp.INSTANCE;
+
+    // Can be simplified
+    RewritesOk("cast(unix_timestamp(date_string_col) as timestamp)", rule,
+        "CAST(date_string_col AS TIMESTAMP)");
+    RewritesOk("cast(unix_timestamp(date_string_col, 'yyyy-MM-dd') as timestamp)", rule,
+        "to_timestamp(date_string_col, 'yyyy-MM-dd')");
+
+    // Verify nothing happens
+    RewritesOk("cast(unix_timestamp(timestamp_col) as timestamp)", rule, null);
+    RewritesOk("cast(unix_timestamp() as timestamp)", rule, null);
+  }
+
   /**
    * NULLIF gets converted to an IF, and has cases where
    * it can be further simplified via SimplifyDistinctFromRule.