You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/04/27 08:42:45 UTC

spark git commit: [SPARK-14944][SPARK-14943][SQL] Remove HiveConf from HiveTableScanExec, HiveTableReader, and ScriptTransformation

Repository: spark
Updated Branches:
  refs/heads/master b2a456064 -> d73d67f62


[SPARK-14944][SPARK-14943][SQL] Remove HiveConf from HiveTableScanExec, HiveTableReader, and ScriptTransformation

## What changes were proposed in this pull request?
This patch removes HiveConf from HiveTableScanExec and HiveTableReader and instead just uses our own configuration system. I'm splitting the large change of removing HiveConf into multiple independent pull requests because it is very difficult to debug test failures when they are all combined in one giant one.

## How was this patch tested?
Should be covered by existing tests.

Author: Reynold Xin <rx...@databricks.com>

Closes #12727 from rxin/SPARK-14944.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d73d67f6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d73d67f6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d73d67f6

Branch: refs/heads/master
Commit: d73d67f623dd65b90d0edbd7ba62e9a6ce7ebd1e
Parents: b2a4560
Author: Reynold Xin <rx...@databricks.com>
Authored: Tue Apr 26 23:42:42 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Apr 26 23:42:42 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/hive/HiveSessionState.scala       |  1 -
 .../apache/spark/sql/hive/HiveStrategies.scala  |  9 ++----
 .../org/apache/spark/sql/hive/TableReader.scala | 32 ++++++++++----------
 .../sql/hive/execution/HiveTableScanExec.scala  | 13 ++++----
 .../hive/execution/ScriptTransformation.scala   | 20 ++++++------
 .../execution/ScriptTransformationSuite.scala   |  8 ++---
 6 files changed, 37 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d73d67f6/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 9633f9e..f071df7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -107,7 +107,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
     new SparkPlanner(sparkSession.sparkContext, conf, experimentalMethods.extraStrategies)
       with HiveStrategies {
       override val sparkSession: SparkSession = self.sparkSession
-      override val hiveconf: HiveConf = self.hiveconf
 
       override def strategies: Seq[Strategy] = {
         experimentalMethods.extraStrategies ++ Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/d73d67f6/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 7d1f87f..71b180e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -17,16 +17,12 @@
 
 package org.apache.spark.sql.hive
 
-import org.apache.hadoop.hive.conf.HiveConf
-
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.execution._
 
 private[hive] trait HiveStrategies {
@@ -34,13 +30,12 @@ private[hive] trait HiveStrategies {
   self: SparkPlanner =>
 
   val sparkSession: SparkSession
-  val hiveconf: HiveConf
 
   object Scripts extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case logical.ScriptTransformation(input, script, output, child, ioschema) =>
         val hiveIoSchema = HiveScriptIOSchema(ioschema)
-        ScriptTransformation(input, script, output, planLater(child), hiveIoSchema)(hiveconf) :: Nil
+        ScriptTransformation(input, script, output, planLater(child), hiveIoSchema) :: Nil
       case _ => Nil
     }
   }
@@ -78,7 +73,7 @@ private[hive] trait HiveStrategies {
           projectList,
           otherPredicates,
           identity[Seq[Expression]],
-          HiveTableScanExec(_, relation, pruningPredicates)(sparkSession, hiveconf)) :: Nil
+          HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil
       case _ =>
         Nil
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/d73d67f6/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index af0317f..df6abc2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -17,17 +17,14 @@
 
 package org.apache.spark.sql.hive
 
-import java.util
-
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, PathFilter}
-import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
 import org.apache.hadoop.hive.ql.exec.Utilities
 import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
 import org.apache.hadoop.hive.ql.plan.TableDesc
 import org.apache.hadoop.hive.serde2.Deserializer
-import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters,
-  StructObjectInspector}
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
 import org.apache.hadoop.hive.serde2.objectinspector.primitive._
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
@@ -62,7 +59,7 @@ class HadoopTableReader(
     @transient private val attributes: Seq[Attribute],
     @transient private val relation: MetastoreRelation,
     @transient private val sparkSession: SparkSession,
-    hiveconf: HiveConf)
+    hadoopConf: Configuration)
   extends TableReader with Logging {
 
   // Hadoop honors "mapred.map.tasks" as hint, but will ignore when mapred.job.tracker is "local".
@@ -72,12 +69,15 @@ class HadoopTableReader(
   private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) {
     0 // will splitted based on block by default.
   } else {
-    math.max(hiveconf.getInt("mapred.map.tasks", 1), sparkSession.sparkContext.defaultMinPartitions)
+    math.max(hadoopConf.getInt("mapred.map.tasks", 1),
+      sparkSession.sparkContext.defaultMinPartitions)
   }
 
-  SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(sparkSession.sparkContext.conf, hiveconf)
-  private val _broadcastedHiveConf =
-    sparkSession.sparkContext.broadcast(new SerializableConfiguration(hiveconf))
+  SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(
+    sparkSession.sparkContext.conf, hadoopConf)
+
+  private val _broadcastedHadoopConf =
+    sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
 
   override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] =
     makeRDDForTable(
@@ -105,7 +105,7 @@ class HadoopTableReader(
     // Create local references to member variables, so that the entire `this` object won't be
     // serialized in the closure below.
     val tableDesc = relation.tableDesc
-    val broadcastedHiveConf = _broadcastedHiveConf
+    val broadcastedHiveConf = _broadcastedHadoopConf
 
     val tablePath = hiveTable.getPath
     val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)
@@ -162,7 +162,7 @@ class HadoopTableReader(
           case (partition, partDeserializer) =>
             def updateExistPathSetByPathPattern(pathPatternStr: String) {
               val pathPattern = new Path(pathPatternStr)
-              val fs = pathPattern.getFileSystem(hiveconf)
+              val fs = pathPattern.getFileSystem(hadoopConf)
               val matches = fs.globStatus(pathPattern)
               matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString)
             }
@@ -209,7 +209,7 @@ class HadoopTableReader(
 
       // Create local references so that the outer object isn't serialized.
       val tableDesc = relation.tableDesc
-      val broadcastedHiveConf = _broadcastedHiveConf
+      val broadcastedHiveConf = _broadcastedHadoopConf
       val localDeserializer = partDeserializer
       val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
 
@@ -259,7 +259,7 @@ class HadoopTableReader(
   private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): String = {
     filterOpt match {
       case Some(filter) =>
-        val fs = path.getFileSystem(hiveconf)
+        val fs = path.getFileSystem(hadoopConf)
         val filteredFiles = fs.listStatus(path, filter).map(_.getPath.toString)
         filteredFiles.mkString(",")
       case None => path.toString
@@ -279,7 +279,7 @@ class HadoopTableReader(
 
     val rdd = new HadoopRDD(
       sparkSession.sparkContext,
-      _broadcastedHiveConf.asInstanceOf[Broadcast[SerializableConfiguration]],
+      _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
       Some(initializeJobConfFunc),
       inputFormatClass,
       classOf[Writable],
@@ -302,7 +302,7 @@ private[hive] object HiveTableUtil {
     val storageHandler =
       org.apache.hadoop.hive.ql.metadata.HiveUtils.getStorageHandler(jobConf, property)
     if (storageHandler != null) {
-      val jobProperties = new util.LinkedHashMap[String, String]
+      val jobProperties = new java.util.LinkedHashMap[String, String]
       if (input) {
         storageHandler.configureInputJobProperties(tableDesc, jobProperties)
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/d73d67f6/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index cc5bbf5..007c338 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution
 
 import scala.collection.JavaConverters._
 
-import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
 import org.apache.hadoop.hive.serde.serdeConstants
 import org.apache.hadoop.hive.serde2.objectinspector._
@@ -48,8 +48,7 @@ case class HiveTableScanExec(
     requestedAttributes: Seq[Attribute],
     relation: MetastoreRelation,
     partitionPruningPred: Seq[Expression])(
-    @transient private val sparkSession: SparkSession,
-    @transient private val hiveconf: HiveConf)
+    @transient private val sparkSession: SparkSession)
   extends LeafExecNode {
 
   require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
@@ -77,20 +76,20 @@ case class HiveTableScanExec(
   // Create a local copy of hiveconf,so that scan specific modifications should not impact
   // other queries
   @transient
-  private[this] val hiveExtraConf = new HiveConf(hiveconf)
+  private[this] val hadoopConf = sparkSession.sessionState.newHadoopConf()
 
   // append columns ids and names before broadcast
-  addColumnMetadataToConf(hiveExtraConf)
+  addColumnMetadataToConf(hadoopConf)
 
   @transient
   private[this] val hadoopReader =
-    new HadoopTableReader(attributes, relation, sparkSession, hiveExtraConf)
+    new HadoopTableReader(attributes, relation, sparkSession, hadoopConf)
 
   private[this] def castFromString(value: String, dataType: DataType) = {
     Cast(Literal(value), dataType).eval(null)
   }
 
-  private def addColumnMetadataToConf(hiveConf: HiveConf) {
+  private def addColumnMetadataToConf(hiveConf: Configuration) {
     // Specifies needed column IDs for those non-partitioning columns.
     val neededColumnIDs = attributes.flatMap(relation.columnOrdinals.get).map(o => o: Integer)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d73d67f6/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index f27337e..f6e6a75 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -26,7 +26,6 @@ import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.ql.exec.{RecordReader, RecordWriter}
 import org.apache.hadoop.hive.serde.serdeConstants
 import org.apache.hadoop.hive.serde2.AbstractSerDe
@@ -58,17 +57,14 @@ case class ScriptTransformation(
     script: String,
     output: Seq[Attribute],
     child: SparkPlan,
-    ioschema: HiveScriptIOSchema)(@transient private val hiveconf: HiveConf)
+    ioschema: HiveScriptIOSchema)
   extends UnaryExecNode {
 
-  override protected def otherCopyArgs: Seq[HiveConf] = hiveconf :: Nil
-
   override def producedAttributes: AttributeSet = outputSet -- inputSet
 
-  private val serializedHiveConf = new SerializableConfiguration(hiveconf)
-
   protected override def doExecute(): RDD[InternalRow] = {
-    def processIterator(inputIterator: Iterator[InternalRow]): Iterator[InternalRow] = {
+    def processIterator(inputIterator: Iterator[InternalRow], hadoopConf: Configuration)
+      : Iterator[InternalRow] = {
       val cmd = List("/bin/bash", "-c", script)
       val builder = new ProcessBuilder(cmd.asJava)
 
@@ -76,7 +72,6 @@ case class ScriptTransformation(
       val inputStream = proc.getInputStream
       val outputStream = proc.getOutputStream
       val errorStream = proc.getErrorStream
-      val localHiveConf = serializedHiveConf.value
 
       // In order to avoid deadlocks, we need to consume the error output of the child process.
       // To avoid issues caused by large error output, we use a circular buffer to limit the amount
@@ -107,7 +102,7 @@ case class ScriptTransformation(
         proc,
         stderrBuffer,
         TaskContext.get(),
-        localHiveConf
+        hadoopConf
       )
 
       // This nullability is a performance optimization in order to avoid an Option.foreach() call
@@ -122,7 +117,7 @@ case class ScriptTransformation(
         val scriptOutputStream = new DataInputStream(inputStream)
 
         @Nullable val scriptOutputReader =
-          ioschema.recordReader(scriptOutputStream, localHiveConf).orNull
+          ioschema.recordReader(scriptOutputStream, hadoopConf).orNull
 
         var scriptOutputWritable: Writable = null
         val reusedWritableObject: Writable = if (null != outputSerde) {
@@ -214,10 +209,13 @@ case class ScriptTransformation(
       outputIterator
     }
 
+    val broadcastedHadoopConf =
+      new SerializableConfiguration(sqlContext.sessionState.newHadoopConf())
+
     child.execute().mapPartitions { iter =>
       if (iter.hasNext) {
         val proj = UnsafeProjection.create(schema)
-        processIterator(iter).map(proj)
+        processIterator(iter, broadcastedHadoopConf.value).map(proj)
       } else {
         // If the input iterator has no rows then do not launch the external script.
         Iterator.empty

http://git-wip-us.apache.org/repos/asf/spark/blob/d73d67f6/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
index 1a15fb7..19e8025 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
@@ -58,7 +58,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
         output = Seq(AttributeReference("a", StringType)()),
         child = child,
         ioschema = noSerdeIOSchema
-      )(hiveContext.sessionState.hiveconf),
+      ),
       rowsDf.collect())
   }
 
@@ -72,7 +72,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
         output = Seq(AttributeReference("a", StringType)()),
         child = child,
         ioschema = serdeIOSchema
-      )(hiveContext.sessionState.hiveconf),
+      ),
       rowsDf.collect())
   }
 
@@ -87,7 +87,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
           output = Seq(AttributeReference("a", StringType)()),
           child = ExceptionInjectingOperator(child),
           ioschema = noSerdeIOSchema
-        )(hiveContext.sessionState.hiveconf),
+        ),
         rowsDf.collect())
     }
     assert(e.getMessage().contains("intentional exception"))
@@ -104,7 +104,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
           output = Seq(AttributeReference("a", StringType)()),
           child = ExceptionInjectingOperator(child),
           ioschema = serdeIOSchema
-        )(hiveContext.sessionState.hiveconf),
+        ),
         rowsDf.collect())
     }
     assert(e.getMessage().contains("intentional exception"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org