You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2017/11/14 07:57:25 UTC

hive git commit: HIVE-17976: HoS: don't set output collector if there's no data to process (Rui reviewed by Xuefu)

Repository: hive
Updated Branches:
  refs/heads/master d5f552528 -> 3bfcfdde0


HIVE-17976: HoS: don't set output collector if there's no data to process (Rui reviewed by Xuefu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3bfcfdde
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3bfcfdde
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3bfcfdde

Branch: refs/heads/master
Commit: 3bfcfdde0c0be2aab1afdf5b1bc71fdcc9e77360
Parents: d5f5525
Author: Rui Li <li...@apache.org>
Authored: Tue Nov 14 15:57:20 2017 +0800
Committer: Rui Li <li...@apache.org>
Committed: Tue Nov 14 15:57:20 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java  |  7 +++++--
 .../hadoop/hive/ql/exec/spark/SparkRecordHandler.java     |  2 ++
 .../hive/ql/exec/spark/SparkReduceRecordHandler.java      | 10 +++++++---
 3 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3bfcfdde/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
index 8333cf5..c177a43 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
@@ -94,7 +94,6 @@ public class SparkMapRecordHandler extends SparkRecordHandler {
       mo.initializeLocalWork(jc);
       mo.initializeMapOperator(jc);
 
-      OperatorUtils.setChildrenCollector(mo.getChildOperators(), output);
       mo.setReporter(rp);
 
       if (localWork == null) {
@@ -124,6 +123,10 @@ public class SparkMapRecordHandler extends SparkRecordHandler {
 
   @Override
   public void processRow(Object key, Object value) throws IOException {
+    if (!anyRow) {
+      OperatorUtils.setChildrenCollector(mo.getChildOperators(), oc);
+      anyRow = true;
+    }
     // reset the execContext for each new row
     execContext.resetRow();
 
@@ -156,7 +159,7 @@ public class SparkMapRecordHandler extends SparkRecordHandler {
   @Override
   public void close() {
     // No row was processed
-    if (oc == null) {
+    if (!anyRow) {
       LOG.trace("Close called. no row processed by map.");
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/3bfcfdde/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
index 2421885..d7488b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
@@ -49,6 +49,8 @@ public abstract class SparkRecordHandler {
   private long rowNumber = 0;
   private long nextLogThreshold = 1;
 
+  protected boolean anyRow = false;
+
   public <K, V> void init(JobConf job, OutputCollector<K, V> output, Reporter reporter) throws Exception {
     jc = job;
     MapredContext.init(false, new JobConf(jc));

http://git-wip-us.apache.org/repos/asf/hive/blob/3bfcfdde/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
index 7c1164b..06b0fae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
@@ -37,8 +37,6 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
@@ -295,6 +293,9 @@ public class SparkReduceRecordHandler extends SparkRecordHandler {
    */
   @Override
   public void processRow(Object key, final Object value) throws IOException {
+    if (!anyRow) {
+      anyRow = true;
+    }
     if (vectorized) {
       processVectorRow(key, value);
     } else {
@@ -307,6 +308,9 @@ public class SparkReduceRecordHandler extends SparkRecordHandler {
 
   @Override
   public <E> void processRow(Object key, Iterator<E> values) throws IOException {
+    if (!anyRow) {
+      anyRow = true;
+    }
     if (vectorized) {
       processVectorRows(key, values);
       return;
@@ -577,7 +581,7 @@ public class SparkReduceRecordHandler extends SparkRecordHandler {
   public void close() {
 
     // No row was processed
-    if (oc == null) {
+    if (!anyRow) {
       LOG.trace("Close called without any rows processed");
     }