You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2017/11/14 07:57:25 UTC
hive git commit: HIVE-17976: HoS: don't set output collector if
there's no data to process (Rui reviewed by Xuefu)
Repository: hive
Updated Branches:
refs/heads/master d5f552528 -> 3bfcfdde0
HIVE-17976: HoS: don't set output collector if there's no data to process (Rui reviewed by Xuefu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3bfcfdde
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3bfcfdde
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3bfcfdde
Branch: refs/heads/master
Commit: 3bfcfdde0c0be2aab1afdf5b1bc71fdcc9e77360
Parents: d5f5525
Author: Rui Li <li...@apache.org>
Authored: Tue Nov 14 15:57:20 2017 +0800
Committer: Rui Li <li...@apache.org>
Committed: Tue Nov 14 15:57:20 2017 +0800
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java | 7 +++++--
.../hadoop/hive/ql/exec/spark/SparkRecordHandler.java | 2 ++
.../hive/ql/exec/spark/SparkReduceRecordHandler.java | 10 +++++++---
3 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3bfcfdde/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
index 8333cf5..c177a43 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
@@ -94,7 +94,6 @@ public class SparkMapRecordHandler extends SparkRecordHandler {
mo.initializeLocalWork(jc);
mo.initializeMapOperator(jc);
- OperatorUtils.setChildrenCollector(mo.getChildOperators(), output);
mo.setReporter(rp);
if (localWork == null) {
@@ -124,6 +123,10 @@ public class SparkMapRecordHandler extends SparkRecordHandler {
@Override
public void processRow(Object key, Object value) throws IOException {
+ if (!anyRow) {
+ OperatorUtils.setChildrenCollector(mo.getChildOperators(), oc);
+ anyRow = true;
+ }
// reset the execContext for each new row
execContext.resetRow();
@@ -156,7 +159,7 @@ public class SparkMapRecordHandler extends SparkRecordHandler {
@Override
public void close() {
// No row was processed
- if (oc == null) {
+ if (!anyRow) {
LOG.trace("Close called. no row processed by map.");
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3bfcfdde/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
index 2421885..d7488b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
@@ -49,6 +49,8 @@ public abstract class SparkRecordHandler {
private long rowNumber = 0;
private long nextLogThreshold = 1;
+ protected boolean anyRow = false;
+
public <K, V> void init(JobConf job, OutputCollector<K, V> output, Reporter reporter) throws Exception {
jc = job;
MapredContext.init(false, new JobConf(jc));
http://git-wip-us.apache.org/repos/asf/hive/blob/3bfcfdde/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
index 7c1164b..06b0fae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
@@ -37,8 +37,6 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
@@ -295,6 +293,9 @@ public class SparkReduceRecordHandler extends SparkRecordHandler {
*/
@Override
public void processRow(Object key, final Object value) throws IOException {
+ if (!anyRow) {
+ anyRow = true;
+ }
if (vectorized) {
processVectorRow(key, value);
} else {
@@ -307,6 +308,9 @@ public class SparkReduceRecordHandler extends SparkRecordHandler {
@Override
public <E> void processRow(Object key, Iterator<E> values) throws IOException {
+ if (!anyRow) {
+ anyRow = true;
+ }
if (vectorized) {
processVectorRows(key, values);
return;
@@ -577,7 +581,7 @@ public class SparkReduceRecordHandler extends SparkRecordHandler {
public void close() {
// No row was processed
- if (oc == null) {
+ if (!anyRow) {
LOG.trace("Close called without any rows processed");
}