You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "Sahil Takiar (JIRA)" <ji...@apache.org> on 2017/09/02 17:15:02 UTC
[jira] [Updated] (HIVE-17225) HoS DPP pruning sink ops can target
parallel work objects
[ https://issues.apache.org/jira/browse/HIVE-17225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sahil Takiar updated HIVE-17225:
--------------------------------
Attachment: HIVE-17225.5.patch
> HoS DPP pruning sink ops can target parallel work objects
> ---------------------------------------------------------
>
> Key: HIVE-17225
> URL: https://issues.apache.org/jira/browse/HIVE-17225
> Project: Hive
> Issue Type: Sub-task
> Components: Spark
> Affects Versions: 3.0.0
> Reporter: Sahil Takiar
> Assignee: Sahil Takiar
> Attachments: HIVE17225.1.patch, HIVE-17225.2.patch, HIVE-17225.3.patch, HIVE-17225.4.patch, HIVE-17225.5.patch
>
>
> Setup:
> {code:sql}
> SET hive.spark.dynamic.partition.pruning=true;
> SET hive.strict.checks.cartesian.product=false;
> SET hive.auto.convert.join=true;
> CREATE TABLE partitioned_table1 (col int) PARTITIONED BY (part_col int);
> CREATE TABLE regular_table1 (col int);
> CREATE TABLE regular_table2 (col int);
> ALTER TABLE partitioned_table1 ADD PARTITION (part_col = 1);
> ALTER TABLE partitioned_table1 ADD PARTITION (part_col = 2);
> ALTER TABLE partitioned_table1 ADD PARTITION (part_col = 3);
> INSERT INTO table regular_table1 VALUES (1), (2), (3), (4), (5), (6);
> INSERT INTO table regular_table2 VALUES (1), (2), (3), (4), (5), (6);
> INSERT INTO TABLE partitioned_table1 PARTITION (part_col = 1) VALUES (1);
> INSERT INTO TABLE partitioned_table1 PARTITION (part_col = 2) VALUES (2);
> INSERT INTO TABLE partitioned_table1 PARTITION (part_col = 3) VALUES (3);
> SELECT *
> FROM partitioned_table1,
> regular_table1 rt1,
> regular_table2 rt2
> WHERE rt1.col = partitioned_table1.part_col
> AND rt2.col = partitioned_table1.part_col;
> {code}
> Exception:
> {code}
> 2017-08-01T13:27:47,483 ERROR [b0d354a8-4cdb-4ba9-acec-27d14926aaf4 main] ql.Driver: FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: java.io.FileNotFoundException: File file:/Users/stakiar/Documents/idea/apache-hive/itests/qtest-spark/target/tmp/scratchdir/stakiar/b0d354a8-4cdb-4ba9-acec-27d14926aaf4/hive_2017-08-01_13-27-45_553_1088589686371686526-1/-mr-10004/3/5 does not exist
> at org.apache.hadoop.hive.ql.io.HiveInputFormat.init(HiveInputFormat.java:408)
> at org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.getSplits(CombineHiveInputFormat.java:498)
> at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
> at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82)
> at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:82)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
> at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82)
> at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:82)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
> at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82)
> at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:82)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
> at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1.apply(AsyncRDDActions.scala:127)
> at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1.apply(AsyncRDDActions.scala:125)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
> at org.apache.spark.rdd.AsyncRDDActions.foreachAsync(AsyncRDDActions.scala:125)
> at org.apache.spark.api.java.JavaRDDLike$class.foreachAsync(JavaRDDLike.scala:731)
> at org.apache.spark.api.java.AbstractJavaRDDLike.foreachAsync(JavaRDDLike.scala:45)
> at org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient$JobStatusJob.call(RemoteHiveSparkClient.java:351)
> at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:358)
> at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:323)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.io.FileNotFoundException: File file:/Users/stakiar/Documents/idea/apache-hive/itests/qtest-spark/target/tmp/scratchdir/stakiar/b0d354a8-4cdb-4ba9-acec-27d14926aaf4/hive_2017-08-01_13-27-45_553_1088589686371686526-1/-mr-10004/3/5 does not exist
> at org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner.processFiles(SparkDynamicPartitionPruner.java:147)
> at org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner.prune(SparkDynamicPartitionPruner.java:76)
> at org.apache.hadoop.hive.ql.io.HiveInputFormat.init(HiveInputFormat.java:406)
> ... 62 more
> Caused by: java.io.FileNotFoundException: File file:/Users/stakiar/Documents/idea/apache-hive/itests/qtest-spark/target/tmp/scratchdir/stakiar/b0d354a8-4cdb-4ba9-acec-27d14926aaf4/hive_2017-08-01_13-27-45_553_1088589686371686526-1/-mr-10004/3/5 does not exist
> at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:431)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557)
> at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:674)
> at org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner.processFiles(SparkDynamicPartitionPruner.java:119)
> ... 64 more
> {code}
> The explain plan for the query is:
> {code}
> STAGE DEPENDENCIES:
> Stage-2 is a root stage
> Stage-1 depends on stages: Stage-2
> Stage-0 depends on stages: Stage-1
> STAGE PLANS:
> Stage: Stage-2
> Spark
> DagName: stakiar_20170801202453_5376c59d-2eca-47ca-94bc-a3049f7fbd0a:39
> Vertices:
> Map 1
> Map Operator Tree:
> TableScan
> alias: partitioned_table1
> Statistics: Num rows: 3 Data size: 3 Basic stats: COMPLETE Column stats: NONE
> Select Operator
> expressions: col (type: int), part_col (type: int)
> outputColumnNames: _col0, _col1
> Statistics: Num rows: 3 Data size: 3 Basic stats: COMPLETE Column stats: NONE
> Spark HashTable Sink Operator
> keys:
> 0 _col1 (type: int)
> 1 _col0 (type: int)
> 2 _col0 (type: int)
> Local Work:
> Map Reduce Local Work
> Map 3
> Map Operator Tree:
> TableScan
> alias: rt2
> Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE Column stats: NONE
> Filter Operator
> predicate: col is not null (type: boolean)
> Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE Column stats: NONE
> Select Operator
> expressions: col (type: int)
> outputColumnNames: _col0
> Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE Column stats: NONE
> Spark HashTable Sink Operator
> keys:
> 0 _col1 (type: int)
> 1 _col0 (type: int)
> 2 _col0 (type: int)
> Select Operator
> expressions: _col0 (type: int)
> outputColumnNames: _col0
> Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE Column stats: NONE
> Group By Operator
> keys: _col0 (type: int)
> mode: hash
> outputColumnNames: _col0
> Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE Column stats: NONE
> Spark Partition Pruning Sink Operator
> partition key expr: part_col
> Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE Column stats: NONE
> target column name: part_col
> target work: Map 1
> Local Work:
> Map Reduce Local Work
> Stage: Stage-1
> Spark
> DagName: stakiar_20170801202453_5376c59d-2eca-47ca-94bc-a3049f7fbd0a:38
> Vertices:
> Map 2
> Map Operator Tree:
> TableScan
> alias: rt1
> Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE Column stats: NONE
> Filter Operator
> predicate: col is not null (type: boolean)
> Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE Column stats: NONE
> Select Operator
> expressions: col (type: int)
> outputColumnNames: _col0
> Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE Column stats: NONE
> Map Join Operator
> condition map:
> Inner Join 0 to 1
> Inner Join 0 to 2
> keys:
> 0 _col1 (type: int)
> 1 _col0 (type: int)
> 2 _col0 (type: int)
> outputColumnNames: _col0, _col1, _col2, _col3
> input vertices:
> 0 Map 1
> 2 Map 3
> Statistics: Num rows: 13 Data size: 13 Basic stats: COMPLETE Column stats: NONE
> File Output Operator
> compressed: false
> Statistics: Num rows: 13 Data size: 13 Basic stats: COMPLETE Column stats: NONE
> table:
> input format: org.apache.hadoop.mapred.SequenceFileInputFormat
> output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
> serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> Local Work:
> Map Reduce Local Work
> Stage: Stage-0
> Fetch Operator
> limit: -1
> Processor Tree:
> ListSink
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)