You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/07/08 19:56:36 UTC

spark git commit: [SPARK-8888][SQL] Use java.util.HashMap in DynamicPartitionWriterContainer.

Repository: spark
Updated Branches:
  refs/heads/master 0ba98c04c -> f61c989b4


[SPARK-8888][SQL] Use java.util.HashMap in DynamicPartitionWriterContainer.

Just a baby step towards making it more efficient.

Author: Reynold Xin <rx...@databricks.com>

Closes #7282 from rxin/SPARK-8888 and squashes the following commits:

3da51ae [Reynold Xin] [SPARK-8888][SQL] Use java.util.HashMap in DynamicPartitionWriterContainer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f61c989b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f61c989b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f61c989b

Branch: refs/heads/master
Commit: f61c989b404808f79a58b6503cf3835cf602528a
Parents: 0ba98c0
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed Jul 8 10:56:31 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Jul 8 10:56:31 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/sources/commands.scala | 36 +++++++++++++-------
 1 file changed, 23 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f61c989b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index a97142d..ecbc889 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.sources
 
 import java.util.{Date, UUID}
 
-import scala.collection.mutable
-
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
@@ -110,7 +108,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
         !exists
     }
     // If we are appending data to an existing dir.
-    val isAppend = (pathExists) && (mode == SaveMode.Append)
+    val isAppend = pathExists && (mode == SaveMode.Append)
 
     if (doInsertion) {
       val job = new Job(hadoopConf)
@@ -142,9 +140,12 @@ private[sql] case class InsertIntoHadoopFsRelation(
       }
     }
 
-    Seq.empty[InternalRow]
+    Seq.empty[Row]
   }
 
+  /**
+   * Inserts the content of the [[DataFrame]] into a table without any partitioning columns.
+   */
   private def insert(writerContainer: BaseWriterContainer, df: DataFrame): Unit = {
     // Uses local vals for serialization
     val needsConversion = relation.needConversion
@@ -188,6 +189,9 @@ private[sql] case class InsertIntoHadoopFsRelation(
     }
   }
 
+  /**
+   * Inserts the content of the [[DataFrame]] into a table with partitioning columns.
+   */
   private def insertWithDynamicPartitions(
       sqlContext: SQLContext,
       writerContainer: BaseWriterContainer,
@@ -497,13 +501,14 @@ private[sql] class DynamicPartitionWriterContainer(
   extends BaseWriterContainer(relation, job, isAppend) {
 
   // All output writers are created on executor side.
-  @transient protected var outputWriters: mutable.Map[String, OutputWriter] = _
+  @transient protected var outputWriters: java.util.HashMap[String, OutputWriter] = _
 
   override protected def initWriters(): Unit = {
-    outputWriters = mutable.Map.empty[String, OutputWriter]
+    outputWriters = new java.util.HashMap[String, OutputWriter]
   }
 
   override def outputWriterForRow(row: Row): OutputWriter = {
+    // TODO (SPARK-8888): zip and all the stuff happening here is very inefficient.
     val partitionPath = partitionColumns.zip(row.toSeq).map { case (col, rawValue) =>
       val string = if (rawValue == null) null else String.valueOf(rawValue)
       val valueString = if (string == null || string.isEmpty) {
@@ -514,18 +519,23 @@ private[sql] class DynamicPartitionWriterContainer(
       s"/$col=$valueString"
     }.mkString.stripPrefix(Path.SEPARATOR)
 
-    outputWriters.getOrElseUpdate(partitionPath, {
+    val writer = outputWriters.get(partitionPath)
+    if (writer.eq(null)) {
       val path = new Path(getWorkPath, partitionPath)
-      taskAttemptContext.getConfiguration.set(
-        "spark.sql.sources.output.path",
+      taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path",
         new Path(outputPath, partitionPath).toString)
-      outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
-    })
+      val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
+      outputWriters.put(partitionPath, newWriter)
+      newWriter
+    } else {
+      writer
+    }
   }
 
   private def clearOutputWriters(): Unit = {
-    if (outputWriters.nonEmpty) {
-      outputWriters.values.foreach(_.close())
+    if (!outputWriters.isEmpty) {
+      val iter = scala.collection.JavaConversions.asScalaIterator(outputWriters.values().iterator())
+      iter.foreach(_.close())
       outputWriters.clear()
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org