You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hive.apache.org by Chao Sun <ch...@cloudera.com> on 2014/10/24 01:56:30 UTC

Review Request 27117: HIVE-8457 - MapOperator initialization fails when multiple Spark threads is enabled [Spark Branch]

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27117/
-----------------------------------------------------------

Review request for hive and Xuefu Zhang.


Bugs: HIVE-8457
    https://issues.apache.org/jira/browse/HIVE-8457


Repository: hive-git


Description
-------

Currently, on the Spark branch, each thread it is bound with a thread-local IOContext, which gets initialized when we generates an input HadoopRDD, and later used in MapOperator, FilterOperator, etc.
And, given the introduction of HIVE-8118, we may have multiple downstream RDDs that share the same input HadoopRDD, and we would like to have the HadoopRDD to be cached, to avoid scanning the same table multiple times. A typical case would be like the following:
     inputRDD     inputRDD
        |            |
       MT_11        MT_12
        |            |
       RT_1         RT_2
Here, MT_11 and MT_12 are MapTran from a splitted MapWork,
and RT_1 and RT_2 are two ReduceTran. Note that, this example is simplified, as we may also have ShuffleTran between MapTran and ReduceTran.
When multiple Spark threads are running, MT_11 may be executed first, and it will ask for an iterator from the HadoopRDD will trigger the creation of the iterator, which in turn triggers the initialization of the IOContext associated with that particular thread.
Now, the problem is: before MT_12 starts executing, it will also ask for an iterator from the
HadoopRDD, and since the RDD is already cached, instead of creating a new iterator, it will just fetch it from the cached result. However, this will skip the initialization of the IOContext associated with this particular thread. And, when MT_12 starts executing, it will try to initialize the MapOperator, but since the IOContext is not initialized, this will fail miserably.


Diffs
-----

  ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java 20ea977 
  ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java 00a6f3d 
  ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java 58e1ceb 

Diff: https://reviews.apache.org/r/27117/diff/


Testing
-------

All multi-insertion related tests are passing on my local machine.


Thanks,

Chao Sun


Re: Review Request 27117: HIVE-8457 - MapOperator initialization fails when multiple Spark threads is enabled [Spark Branch]

Posted by Xuefu Zhang <xz...@cloudera.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27117/#review58183
-----------------------------------------------------------



ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
<https://reviews.apache.org/r/27117/#comment99118>

    We don't need this, as this class is only used for Spark.



ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
<https://reviews.apache.org/r/27117/#comment99120>

    Let's give a less conflicting name, such as SPARK_MAP_IO_CONTEXT. Same below. Better define a constant in SparkUtils.



ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
<https://reviews.apache.org/r/27117/#comment99122>

    We may need to copy other fields in IOContext besides input path.



ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
<https://reviews.apache.org/r/27117/#comment99124>

    Same as above



ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
<https://reviews.apache.org/r/27117/#comment99126>

    We need to copy every field.


- Xuefu Zhang


On Oct. 23, 2014, 11:56 p.m., Chao Sun wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27117/
> -----------------------------------------------------------
> 
> (Updated Oct. 23, 2014, 11:56 p.m.)
> 
> 
> Review request for hive and Xuefu Zhang.
> 
> 
> Bugs: HIVE-8457
>     https://issues.apache.org/jira/browse/HIVE-8457
> 
> 
> Repository: hive-git
> 
> 
> Description
> -------
> 
> Currently, on the Spark branch, each thread it is bound with a thread-local IOContext, which gets initialized when we generates an input HadoopRDD, and later used in MapOperator, FilterOperator, etc.
> And, given the introduction of HIVE-8118, we may have multiple downstream RDDs that share the same input HadoopRDD, and we would like to have the HadoopRDD to be cached, to avoid scanning the same table multiple times. A typical case would be like the following:
>      inputRDD     inputRDD
>         |            |
>        MT_11        MT_12
>         |            |
>        RT_1         RT_2
> Here, MT_11 and MT_12 are MapTran from a splitted MapWork,
> and RT_1 and RT_2 are two ReduceTran. Note that, this example is simplified, as we may also have ShuffleTran between MapTran and ReduceTran.
> When multiple Spark threads are running, MT_11 may be executed first, and it will ask for an iterator from the HadoopRDD will trigger the creation of the iterator, which in turn triggers the initialization of the IOContext associated with that particular thread.
> Now, the problem is: before MT_12 starts executing, it will also ask for an iterator from the
> HadoopRDD, and since the RDD is already cached, instead of creating a new iterator, it will just fetch it from the cached result. However, this will skip the initialization of the IOContext associated with this particular thread. And, when MT_12 starts executing, it will try to initialize the MapOperator, but since the IOContext is not initialized, this will fail miserably.
> 
> 
> Diffs
> -----
> 
>   ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java 20ea977 
>   ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java 00a6f3d 
>   ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java 58e1ceb 
> 
> Diff: https://reviews.apache.org/r/27117/diff/
> 
> 
> Testing
> -------
> 
> All multi-insertion related tests are passing on my local machine.
> 
> 
> Thanks,
> 
> Chao Sun
> 
>


Re: Review Request 27117: HIVE-8457 - MapOperator initialization fails when multiple Spark threads is enabled [Spark Branch]

Posted by Chao Sun <ch...@cloudera.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27117/
-----------------------------------------------------------

(Updated Oct. 24, 2014, 4:51 p.m.)


Review request for hive and Xuefu Zhang.


Changes
-------

Thanks Xuefu for the comments. I've updated my patch.


Bugs: HIVE-8457
    https://issues.apache.org/jira/browse/HIVE-8457


Repository: hive-git


Description
-------

Currently, on the Spark branch, each thread it is bound with a thread-local IOContext, which gets initialized when we generates an input HadoopRDD, and later used in MapOperator, FilterOperator, etc.
And, given the introduction of HIVE-8118, we may have multiple downstream RDDs that share the same input HadoopRDD, and we would like to have the HadoopRDD to be cached, to avoid scanning the same table multiple times. A typical case would be like the following:
     inputRDD     inputRDD
        |            |
       MT_11        MT_12
        |            |
       RT_1         RT_2
Here, MT_11 and MT_12 are MapTran from a splitted MapWork,
and RT_1 and RT_2 are two ReduceTran. Note that, this example is simplified, as we may also have ShuffleTran between MapTran and ReduceTran.
When multiple Spark threads are running, MT_11 may be executed first, and it will ask for an iterator from the HadoopRDD will trigger the creation of the iterator, which in turn triggers the initialization of the IOContext associated with that particular thread.
Now, the problem is: before MT_12 starts executing, it will also ask for an iterator from the
HadoopRDD, and since the RDD is already cached, instead of creating a new iterator, it will just fetch it from the cached result. However, this will skip the initialization of the IOContext associated with this particular thread. And, when MT_12 starts executing, it will try to initialize the MapOperator, but since the IOContext is not initialized, this will fail miserably.


Diffs (updated)
-----

  ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java 20ea977 
  ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java 00a6f3d 
  ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java 4de3ad4 
  ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java 58e1ceb 
  ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java 5fb3b13 

Diff: https://reviews.apache.org/r/27117/diff/


Testing
-------

All multi-insertion related tests are passing on my local machine.


Thanks,

Chao Sun