You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/01/26 18:17:13 UTC

[spark] branch master updated: [SPARK-26630][SQL] Support reading Hive-serde tables whose INPUTFORMAT is org.apache.hadoop.mapreduce

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e71acd9  [SPARK-26630][SQL] Support reading Hive-serde tables whose INPUTFORMAT is org.apache.hadoop.mapreduce
e71acd9 is described below

commit e71acd9a23926e6c84e462009f145a80ea24bf85
Author: heguozi <zy...@gmail.com>
AuthorDate: Sat Jan 26 10:17:03 2019 -0800

    [SPARK-26630][SQL] Support reading Hive-serde tables whose INPUTFORMAT is org.apache.hadoop.mapreduce
    
    ## What changes were proposed in this pull request?
    
    When we read a hive table and create RDDs in `TableReader`, it'll throw exception `java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.TextInputFormat cannot be cast to org.apache.hadoop.mapred.InputFormat` if the input format class of the table is from mapreduce package.
    
    Now we use NewHadoopRDD to deal with the new input format and keep HadoopRDD to the old one.
    
    This PR is from #23506. We can reproduce this issue by executing the new test with the code in old version. When create a table with `org.apache.hadoop.mapreduce.....` input format, we will find the exception thrown in `org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:190)`
    
    ## How was this patch tested?
    
    Added a new test.
    
    Closes #23559 from Deegue/fix-hadoopRDD.
    
    Lead-authored-by: heguozi <zy...@gmail.com>
    Co-authored-by: Yizhong Zhang <zy...@163.com>
    Signed-off-by: gatorsmile <ga...@gmail.com>
---
 .../org/apache/spark/sql/hive/TableReader.scala    |  61 +++++++++---
 .../spark/sql/hive/execution/HiveDDLSuite.scala    | 102 +++++++++++++++++++++
 2 files changed, 149 insertions(+), 14 deletions(-)

diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 536bc4a..ad11719 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -31,12 +31,13 @@ import org.apache.hadoop.hive.serde2.Deserializer
 import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
 import org.apache.hadoop.hive.serde2.objectinspector.primitive._
 import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
+import org.apache.hadoop.mapred.{FileInputFormat, InputFormat => oldInputClass, JobConf}
+import org.apache.hadoop.mapreduce.{InputFormat => newInputClass}
 
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
+import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD, UnionRDD}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.CastSupport
@@ -123,9 +124,7 @@ class HadoopTableReader(
     val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)
 
     // logDebug("Table input: %s".format(tablePath))
-    val ifc = hiveTable.getInputFormatClass
-      .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
-    val hadoopRDD = createHadoopRdd(localTableDesc, inputPathStr, ifc)
+    val hadoopRDD = createHadoopRDD(localTableDesc, inputPathStr)
 
     val attrsWithIndex = attributes.zipWithIndex
     val mutableRow = new SpecificInternalRow(attributes.map(_.dataType))
@@ -164,7 +163,7 @@ class HadoopTableReader(
     def verifyPartitionPath(
         partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]):
         Map[HivePartition, Class[_ <: Deserializer]] = {
-      if (!sparkSession.sessionState.conf.verifyPartitionPath) {
+      if (!conf.verifyPartitionPath) {
         partitionToDeserializer
       } else {
         val existPathSet = collection.mutable.Set[String]()
@@ -202,8 +201,6 @@ class HadoopTableReader(
       val partDesc = Utilities.getPartitionDesc(partition)
       val partPath = partition.getDataLocation
       val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)
-      val ifc = partDesc.getInputFileFormatClass
-        .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
       // Get partition field info
       val partSpec = partDesc.getPartSpec
       val partProps = partDesc.getProperties
@@ -243,7 +240,7 @@ class HadoopTableReader(
 
       // Create local references so that the outer object isn't serialized.
       val localTableDesc = tableDesc
-      createHadoopRdd(localTableDesc, inputPathStr, ifc).mapPartitions { iter =>
+      createHadoopRDD(localTableDesc, inputPathStr).mapPartitions { iter =>
         val hconf = broadcastedHiveConf.value.value
         val deserializer = localDeserializer.getConstructor().newInstance()
         // SPARK-13709: For SerDes like AvroSerDe, some essential information (e.g. Avro schema
@@ -289,15 +286,28 @@ class HadoopTableReader(
   }
 
   /**
+   * The entry of creating a RDD.
+   * [SPARK-26630] Using which HadoopRDD will be decided by the input format of tables.
+   * The input format of NewHadoopRDD is from `org.apache.hadoop.mapreduce` package while
+   * the input format of HadoopRDD is from `org.apache.hadoop.mapred` package.
+   */
+  private def createHadoopRDD(localTableDesc: TableDesc, inputPathStr: String): RDD[Writable] = {
+    val inputFormatClazz = localTableDesc.getInputFileFormatClass
+    if (classOf[newInputClass[_, _]].isAssignableFrom(inputFormatClazz)) {
+      createNewHadoopRDD(localTableDesc, inputPathStr)
+    } else {
+      createOldHadoopRDD(localTableDesc, inputPathStr)
+    }
+  }
+
+  /**
    * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be
    * applied locally on each slave.
    */
-  private def createHadoopRdd(
-    tableDesc: TableDesc,
-    path: String,
-    inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = {
-
+  private def createOldHadoopRDD(tableDesc: TableDesc, path: String): RDD[Writable] = {
     val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _
+    val inputFormatClass = tableDesc.getInputFileFormatClass
+      .asInstanceOf[Class[oldInputClass[Writable, Writable]]]
 
     val rdd = new HadoopRDD(
       sparkSession.sparkContext,
@@ -311,6 +321,29 @@ class HadoopTableReader(
     // Only take the value (skip the key) because Hive works only with values.
     rdd.map(_._2)
   }
+
+  /**
+   * Creates a NewHadoopRDD based on the broadcasted HiveConf and other job properties that will be
+   * applied locally on each slave.
+   */
+  private def createNewHadoopRDD(tableDesc: TableDesc, path: String): RDD[Writable] = {
+    val newJobConf = new JobConf(hadoopConf)
+    HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc)(newJobConf)
+    val inputFormatClass = tableDesc.getInputFileFormatClass
+      .asInstanceOf[Class[newInputClass[Writable, Writable]]]
+
+    val rdd = new NewHadoopRDD(
+      sparkSession.sparkContext,
+      inputFormatClass,
+      classOf[Writable],
+      classOf[Writable],
+      newJobConf
+    )
+
+    // Only take the value (skip the key) because Hive works only with values.
+    rdd.map(_._2)
+  }
+
 }
 
 private[hive] object HiveTableUtil {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 6abdc40..5e97a05 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -260,6 +260,108 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
       assert(err.contains("Cannot recognize hive type string:"))
    }
   }
+
+  test("SPARK-26630: table with old input format and without partitioned will use HadoopRDD") {
+    withTable("table_old", "table_ctas_old") {
+      sql(
+        """
+          |CREATE TABLE table_old (col1 LONG, col2 STRING, col3 DOUBLE, col4 BOOLEAN)
+          |STORED AS
+          |INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
+          |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+        """.stripMargin)
+      sql(
+        """
+          |INSERT INTO table_old
+          |VALUES (2147483648, 'AAA', 3.14, false), (2147483649, 'BBB', 3.142, true)
+        """.stripMargin)
+      checkAnswer(
+        sql("SELECT col1, col2, col3, col4 FROM table_old"),
+        Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, true) :: Nil)
+
+      sql("CREATE TABLE table_ctas_old AS SELECT col1, col2, col3, col4 FROM table_old")
+      checkAnswer(
+        sql("SELECT col1, col2, col3, col4 from table_ctas_old"),
+        Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, true) :: Nil)
+    }
+  }
+
+  test("SPARK-26630: table with old input format and partitioned will use HadoopRDD") {
+    withTable("table_pt_old", "table_ctas_pt_old") {
+      sql(
+        """
+          |CREATE TABLE table_pt_old (col1 LONG, col2 STRING, col3 DOUBLE, col4 BOOLEAN)
+          |PARTITIONED BY (pt INT)
+          |STORED AS
+          |INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
+          |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+        """.stripMargin)
+      sql(
+        """
+          |INSERT INTO table_pt_old PARTITION (pt = 1)
+          |VALUES (2147483648, 'AAA', 3.14, false), (2147483649, 'BBB', 3.142, true)
+        """.stripMargin)
+      checkAnswer(
+        sql("SELECT col1, col2, col3, col4 FROM table_pt_old WHERE pt = 1"),
+        Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, true) :: Nil)
+
+      sql("CREATE TABLE table_ctas_pt_old AS SELECT col1, col2, col3, col4 FROM table_pt_old")
+      checkAnswer(
+        sql("SELECT col1, col2, col3, col4 from table_ctas_pt_old"),
+        Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, true) :: Nil)
+    }
+  }
+
+  test("SPARK-26630: table with new input format and without partitioned will use NewHadoopRDD") {
+    withTable("table_new", "table_ctas_new") {
+      sql(
+        """
+          |CREATE TABLE table_new (col1 LONG, col2 STRING, col3 DOUBLE, col4 BOOLEAN)
+          |STORED AS
+          |INPUTFORMAT 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat'
+          |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+        """.stripMargin)
+      sql(
+        """
+          |INSERT INTO table_new
+          |VALUES (2147483648, 'AAA', 3.14, false), (2147483649, 'BBB', 3.142, true)
+        """.stripMargin)
+      checkAnswer(
+        sql("SELECT col1, col2, col3, col4 FROM table_new"),
+        Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, true) :: Nil)
+
+      sql("CREATE TABLE table_ctas_new AS SELECT col1, col2, col3, col4 FROM table_new")
+      checkAnswer(
+        sql("SELECT col1, col2, col3, col4 from table_ctas_new"),
+        Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, true) :: Nil)
+    }
+  }
+
+  test("SPARK-26630: table with new input format and partitioned will use NewHadoopRDD") {
+    withTable("table_pt_new", "table_ctas_pt_new") {
+      sql(
+        """
+          |CREATE TABLE table_pt_new (col1 LONG, col2 STRING, col3 DOUBLE, col4 BOOLEAN)
+          |PARTITIONED BY (pt INT)
+          |STORED AS
+          |INPUTFORMAT 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat'
+          |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+        """.stripMargin)
+      sql(
+        """
+          |INSERT INTO table_pt_new PARTITION (pt = 1)
+          |VALUES (2147483648, 'AAA', 3.14, false), (2147483649, 'BBB', 3.142, true)
+        """.stripMargin)
+      checkAnswer(
+        sql("SELECT col1, col2, col3, col4 FROM table_pt_new WHERE pt = 1"),
+        Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, true) :: Nil)
+
+      sql("CREATE TABLE table_ctas_pt_new AS SELECT col1, col2, col3, col4 FROM table_pt_new")
+      checkAnswer(
+        sql("SELECT col1, col2, col3, col4 from table_ctas_pt_new"),
+        Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, true) :: Nil)
+    }
+  }
 }
 
 class HiveDDLSuite


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org