You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2015/07/11 03:15:40 UTC

spark git commit: [SPARK-8961] [SQL] Makes BaseWriterContainer.outputWriterForRow accepts InternalRow instead of Row

Repository: spark
Updated Branches:
  refs/heads/master b6fc0adf6 -> 336308836


[SPARK-8961] [SQL] Makes BaseWriterContainer.outputWriterForRow accepts InternalRow instead of Row

This is a follow-up of [SPARK-8888] [1], which also aims to optimize writing dynamic partitions.

Three more changes can be made here:

1. Using `InternalRow` instead of `Row` in `BaseWriterContainer.outputWriterForRow`
2. Using `Cast` expressions to convert partition columns to strings, so that we can leverage code generation.
3. Replacing the FP-style `zip` and `map` calls with a faster imperative `while` loop.

[1]: https://issues.apache.org/jira/browse/SPARK-8888

Author: Cheng Lian <li...@databricks.com>

Closes #7331 from liancheng/spark-8961 and squashes the following commits:

b5ab9ae [Cheng Lian] Casts Java iterator to Scala iterator explicitly
719e63b [Cheng Lian] Makes BaseWriterContainer.outputWriterForRow accepts InternalRow instead of Row


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33630883
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33630883
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33630883

Branch: refs/heads/master
Commit: 33630883685eafcc3ee4521ea8363be342f6e6b4
Parents: b6fc0ad
Author: Cheng Lian <li...@databricks.com>
Authored: Fri Jul 10 18:15:36 2015 -0700
Committer: Cheng Lian <li...@databricks.com>
Committed: Fri Jul 10 18:15:36 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/sources/commands.scala | 73 +++++++++++---------
 1 file changed, 42 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/33630883/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 9189d17..5c6ef2d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.sources
 
 import java.util.{Date, UUID}
 
+import scala.collection.JavaConversions.asScalaIterator
+
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
@@ -26,15 +28,14 @@ import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceF
 import org.apache.spark._
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, Row, SQLConf, SQLContext, SaveMode}
+import org.apache.spark.sql.types.StringType
 import org.apache.spark.util.SerializableConfiguration
 
 private[sql] case class InsertIntoDataSource(
@@ -170,14 +171,14 @@ private[sql] case class InsertIntoHadoopFsRelation(
       try {
         writerContainer.executorSideSetup(taskContext)
 
-        val converter = if (needsConversion) {
+        val converter: InternalRow => Row = if (needsConversion) {
           CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
         } else {
           r: InternalRow => r.asInstanceOf[Row]
         }
         while (iterator.hasNext) {
-          val row = converter(iterator.next())
-          writerContainer.outputWriterForRow(row).write(row)
+          val internalRow = iterator.next()
+          writerContainer.outputWriterForRow(internalRow).write(converter(internalRow))
         }
 
         writerContainer.commitTask()
@@ -239,7 +240,9 @@ private[sql] case class InsertIntoHadoopFsRelation(
       try {
         writerContainer.executorSideSetup(taskContext)
 
-        val partitionProj = newProjection(codegenEnabled, partitionOutput, output)
+        // Projects all partition columns and casts them to strings to build partition directories.
+        val partitionCasts = partitionOutput.map(Cast(_, StringType))
+        val partitionProj = newProjection(codegenEnabled, partitionCasts, output)
         val dataProj = newProjection(codegenEnabled, dataOutput, output)
 
         val dataConverter: InternalRow => Row = if (needsConversion) {
@@ -247,15 +250,11 @@ private[sql] case class InsertIntoHadoopFsRelation(
         } else {
           r: InternalRow => r.asInstanceOf[Row]
         }
-        val partitionSchema = StructType.fromAttributes(partitionOutput)
-        val partConverter: InternalRow => Row =
-          CatalystTypeConverters.createToScalaConverter(partitionSchema)
-            .asInstanceOf[InternalRow => Row]
 
         while (iterator.hasNext) {
-          val row = iterator.next()
-          val partitionPart = partConverter(partitionProj(row))
-          val dataPart = dataConverter(dataProj(row))
+          val internalRow = iterator.next()
+          val partitionPart = partitionProj(internalRow)
+          val dataPart = dataConverter(dataProj(internalRow))
           writerContainer.outputWriterForRow(partitionPart).write(dataPart)
         }
 
@@ -435,7 +434,7 @@ private[sql] abstract class BaseWriterContainer(
   }
 
   // Called on executor side when writing rows
-  def outputWriterForRow(row: Row): OutputWriter
+  def outputWriterForRow(row: InternalRow): OutputWriter
 
   protected def initWriters(): Unit
 
@@ -477,7 +476,7 @@ private[sql] class DefaultWriterContainer(
     writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
   }
 
-  override def outputWriterForRow(row: Row): OutputWriter = writer
+  override def outputWriterForRow(row: InternalRow): OutputWriter = writer
 
   override def commitTask(): Unit = {
     try {
@@ -518,23 +517,36 @@ private[sql] class DynamicPartitionWriterContainer(
     outputWriters = new java.util.HashMap[String, OutputWriter]
   }
 
-  override def outputWriterForRow(row: Row): OutputWriter = {
-    // TODO (SPARK-8888): zip and all the stuff happening here is very inefficient.
-    val partitionPath = partitionColumns.zip(row.toSeq).map { case (col, rawValue) =>
-      val string = if (rawValue == null) null else String.valueOf(rawValue)
-      val valueString = if (string == null || string.isEmpty) {
-        defaultPartitionName
-      } else {
-        PartitioningUtils.escapePathName(string)
+  // The `row` argument is supposed to only contain partition column values which have been casted
+  // to strings.
+  override def outputWriterForRow(row: InternalRow): OutputWriter = {
+    val partitionPath = {
+      val partitionPathBuilder = new StringBuilder
+      var i = 0
+
+      while (i < partitionColumns.length) {
+        val col = partitionColumns(i)
+        val partitionValueString = {
+          val string = row.getString(i)
+          if (string.eq(null)) defaultPartitionName else PartitioningUtils.escapePathName(string)
+        }
+
+        if (i > 0) {
+          partitionPathBuilder.append(Path.SEPARATOR_CHAR)
+        }
+
+        partitionPathBuilder.append(s"$col=$partitionValueString")
+        i += 1
       }
-      s"/$col=$valueString"
-    }.mkString.stripPrefix(Path.SEPARATOR)
+
+      partitionPathBuilder.toString()
+    }
 
     val writer = outputWriters.get(partitionPath)
     if (writer.eq(null)) {
       val path = new Path(getWorkPath, partitionPath)
-      taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path",
-        new Path(outputPath, partitionPath).toString)
+      taskAttemptContext.getConfiguration.set(
+        "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
       val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
       outputWriters.put(partitionPath, newWriter)
       newWriter
@@ -545,8 +557,7 @@ private[sql] class DynamicPartitionWriterContainer(
 
   private def clearOutputWriters(): Unit = {
     if (!outputWriters.isEmpty) {
-      val iter = scala.collection.JavaConversions.asScalaIterator(outputWriters.values().iterator())
-      iter.foreach(_.close())
+      asScalaIterator(outputWriters.values().iterator()).foreach(_.close())
       outputWriters.clear()
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org