You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/04/22 22:22:08 UTC

[GitHub] [spark] imback82 commented on a change in pull request #32198: [SPARK-26164][SQL] Allow concurrent writers for writing dynamic partitions and bucket table

imback82 commented on a change in pull request #32198:
URL: https://github.com/apache/spark/pull/32198#discussion_r618770419



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
##########
@@ -255,25 +331,180 @@ class DynamicPartitionDataWriter(
       }
       if (isBucketed) {
         currentBucketId = nextBucketId
-        statsTrackers.foreach(_.newBucket(currentBucketId.get))
       }
 
       fileCounter = 0
-      newOutputWriter(currentPartitionValues, currentBucketId)
+      renewCurrentWriter(currentPartitionValues, currentBucketId, closeCurrentWriter = true)
     } else if (description.maxRecordsPerFile > 0 &&
       recordsInFile >= description.maxRecordsPerFile) {
-      // Exceeded the threshold in terms of the number of records per file.
-      // Create a new file by increasing the file counter.
-      fileCounter += 1
-      assert(fileCounter < MAX_FILE_COUNTER,
-        s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER")
+      increaseFileCounter(currentPartitionValues, currentBucketId)
+    }
+    writeRecord(record)
+  }
+}
+
+/**
+ * Dynamic partition writer with concurrent writers, meaning multiple concurrent writers are opened
+ * for writing.
+ *
+ * The process has the following steps:
+ *  - Step 1: Maintain a map of output writers per each partition and/or bucket columns. Keep all
+ *            writers opened and write rows one by one.
+ *  - Step 2: If number of concurrent writers exceeds limit, sort rest of rows on partition and/or
+ *            bucket column(s). Write rows one by one, and eagerly close the writer when finishing
+ *            each partition and/or bucket.
+ *
+ * Caller is expected to call `writeWithIterator()` instead of `write()` to write records.
+ */
+class DynamicPartitionDataConcurrentWriter(
+    description: WriteJobDescription,
+    taskAttemptContext: TaskAttemptContext,
+    committer: FileCommitProtocol,
+    concurrentOutputWriterSpec: ConcurrentOutputWriterSpec)
+  extends BaseDynamicPartitionDataWriter(description, taskAttemptContext, committer) {
+
+  /** Wrapper class to index a unique concurrent output writer. */
+  private case class WriterIndex(
+    var partitionValues: Option[UnsafeRow],
+    var bucketId: Option[Int])
+
+  /** Wrapper class for status of a unique concurrent output writer. */
+  private class WriterStatus(
+    var outputWriter: OutputWriter,
+    var recordsInFile: Long,
+    var fileCounter: Int)
+
+  /**
+   * State to indicate if we are falling back to sort-based writer.
+   * Because we first try to use concurrent writers, its initial value is false.
+   */
+  private var sorted: Boolean = false
+  private val concurrentWriters = mutable.HashMap[WriterIndex, WriterStatus]()
+  private val currentWriterId = WriterIndex(None, None)
+
+  /**
+   * Release resources for all concurrent output writers.
+   */
+  override protected def releaseResources(): Unit = {
+    currentWriter = null
+    concurrentWriters.values.foreach(status => {
+      if (status.outputWriter != null) {
+        try {
+          status.outputWriter.close()
+        } finally {
+          status.outputWriter = null
+        }
+      }
+    })
+    concurrentWriters.clear()
+  }
 
-      newOutputWriter(currentPartitionValues, currentBucketId)
+  override def write(record: InternalRow): Unit = {
+    val nextPartitionValues = if (isPartitioned) Some(getPartitionValues(record)) else None
+    val nextBucketId = if (isBucketed) Some(getBucketId(record)) else None
+
+    if (currentWriterId.partitionValues != nextPartitionValues ||
+      currentWriterId.bucketId != nextBucketId) {
+      // See a new partition or bucket - write to a new partition dir (or a new bucket file).
+      if (currentWriter != null) {
+        if (!sorted) {
+          // Update writer status in concurrent writers map, because the writer is probably needed
+          // again later for writing other rows.
+          updateCurrentWriterStatusInMap()
+        } else {
+          // Remove writer status in concurrent writers map and release current writer resource,
+          // because the writer is not needed any more.
+          concurrentWriters.remove(currentWriterId)
+          releaseCurrentWriter()
+        }
+      }
+
+      if (isBucketed) {
+        currentWriterId.bucketId = nextBucketId
+      }
+      if (isPartitioned && currentWriterId.partitionValues != nextPartitionValues) {
+        currentWriterId.partitionValues = Some(nextPartitionValues.get.copy())
+        if (!concurrentWriters.contains(currentWriterId)) {
+          statsTrackers.foreach(_.newPartition(currentWriterId.partitionValues.get))
+        }
+      }
+      retrieveWriterInMap()
     }
-    val outputRow = getOutputRow(record)
-    currentWriter.write(outputRow)
-    statsTrackers.foreach(_.newRow(outputRow))
-    recordsInFile += 1
+
+    if (description.maxRecordsPerFile > 0 &&
+      recordsInFile >= description.maxRecordsPerFile) {
+      increaseFileCounter(currentWriterId.partitionValues, currentWriterId.bucketId)
+      // Update writer status in concurrent writers map, as a new writer is created.
+      updateCurrentWriterStatusInMap()
+    }
+    writeRecord(record)
+  }
+
+  /**
+   * Write iterator of records with concurrent writers.
+   */
+  override def writeWithIterator(iterator: Iterator[InternalRow]): Unit = {
+    while (iterator.hasNext && !sorted) {
+      write(iterator.next())
+    }
+
+    if (iterator.hasNext) {
+      clearCurrentWriterStatus()
+      val sorter = concurrentOutputWriterSpec.createSorter()
+      val sortIterator = sorter.sort(iterator.asInstanceOf[Iterator[UnsafeRow]])
+      while (sortIterator.hasNext) {
+        write(sortIterator.next())
+      }
+    }
+  }
+
+  /**
+   * Update current writer status in map.
+   */
+  private def updateCurrentWriterStatusInMap(): Unit = {
+    val status = concurrentWriters(currentWriterId)
+    status.outputWriter = currentWriter
+    status.recordsInFile = recordsInFile
+    status.fileCounter = fileCounter
+  }
+
+  /**
+   * Retrieve writer in map, or create a new writer if not exists.
+   */
+  private def retrieveWriterInMap(): Unit = {
+    if (concurrentWriters.contains(currentWriterId)) {
+      val status = concurrentWriters(currentWriterId)
+      currentWriter = status.outputWriter
+      recordsInFile = status.recordsInFile
+      fileCounter = status.fileCounter
+    } else {
+      fileCounter = 0
+      renewCurrentWriter(
+        currentWriterId.partitionValues,
+        currentWriterId.bucketId,
+        closeCurrentWriter = false)
+      concurrentWriters.put(
+        WriterIndex(currentWriterId.partitionValues, currentWriterId.bucketId),
+        new WriterStatus(currentWriter, recordsInFile, fileCounter))
+      if (concurrentWriters.size > concurrentOutputWriterSpec.maxWriters && !sorted) {

Review comment:
       nit: Looks like we will be off by 1? (`.doc("Maximum number of output file writers to use concurrently.`)

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
##########
@@ -255,25 +331,180 @@ class DynamicPartitionDataWriter(
       }
       if (isBucketed) {
         currentBucketId = nextBucketId
-        statsTrackers.foreach(_.newBucket(currentBucketId.get))
       }
 
       fileCounter = 0
-      newOutputWriter(currentPartitionValues, currentBucketId)
+      renewCurrentWriter(currentPartitionValues, currentBucketId, closeCurrentWriter = true)
     } else if (description.maxRecordsPerFile > 0 &&
       recordsInFile >= description.maxRecordsPerFile) {
-      // Exceeded the threshold in terms of the number of records per file.
-      // Create a new file by increasing the file counter.
-      fileCounter += 1
-      assert(fileCounter < MAX_FILE_COUNTER,
-        s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER")
+      increaseFileCounter(currentPartitionValues, currentBucketId)
+    }
+    writeRecord(record)
+  }
+}
+
+/**
+ * Dynamic partition writer with concurrent writers, meaning multiple concurrent writers are opened
+ * for writing.
+ *
+ * The process has the following steps:
+ *  - Step 1: Maintain a map of output writers per each partition and/or bucket columns. Keep all
+ *            writers opened and write rows one by one.
+ *  - Step 2: If number of concurrent writers exceeds limit, sort rest of rows on partition and/or
+ *            bucket column(s). Write rows one by one, and eagerly close the writer when finishing
+ *            each partition and/or bucket.
+ *
+ * Caller is expected to call `writeWithIterator()` instead of `write()` to write records.
+ */
+class DynamicPartitionDataConcurrentWriter(
+    description: WriteJobDescription,
+    taskAttemptContext: TaskAttemptContext,
+    committer: FileCommitProtocol,
+    concurrentOutputWriterSpec: ConcurrentOutputWriterSpec)
+  extends BaseDynamicPartitionDataWriter(description, taskAttemptContext, committer) {
+
+  /** Wrapper class to index a unique concurrent output writer. */
+  private case class WriterIndex(
+    var partitionValues: Option[UnsafeRow],
+    var bucketId: Option[Int])
+
+  /** Wrapper class for status of a unique concurrent output writer. */
+  private class WriterStatus(
+    var outputWriter: OutputWriter,
+    var recordsInFile: Long,
+    var fileCounter: Int)
+
+  /**
+   * State to indicate if we are falling back to sort-based writer.
+   * Because we first try to use concurrent writers, its initial value is false.
+   */
+  private var sorted: Boolean = false
+  private val concurrentWriters = mutable.HashMap[WriterIndex, WriterStatus]()
+  private val currentWriterId = WriterIndex(None, None)
+
+  /**
+   * Release resources for all concurrent output writers.
+   */
+  override protected def releaseResources(): Unit = {
+    currentWriter = null
+    concurrentWriters.values.foreach(status => {
+      if (status.outputWriter != null) {
+        try {
+          status.outputWriter.close()
+        } finally {
+          status.outputWriter = null
+        }
+      }
+    })
+    concurrentWriters.clear()
+  }
 
-      newOutputWriter(currentPartitionValues, currentBucketId)
+  override def write(record: InternalRow): Unit = {
+    val nextPartitionValues = if (isPartitioned) Some(getPartitionValues(record)) else None
+    val nextBucketId = if (isBucketed) Some(getBucketId(record)) else None

Review comment:
       Do we need to go thru these even after `sorted` is set to `true`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
##########
@@ -255,25 +331,180 @@ class DynamicPartitionDataWriter(
       }
       if (isBucketed) {
         currentBucketId = nextBucketId
-        statsTrackers.foreach(_.newBucket(currentBucketId.get))
       }
 
       fileCounter = 0
-      newOutputWriter(currentPartitionValues, currentBucketId)
+      renewCurrentWriter(currentPartitionValues, currentBucketId, closeCurrentWriter = true)
     } else if (description.maxRecordsPerFile > 0 &&
       recordsInFile >= description.maxRecordsPerFile) {
-      // Exceeded the threshold in terms of the number of records per file.
-      // Create a new file by increasing the file counter.
-      fileCounter += 1
-      assert(fileCounter < MAX_FILE_COUNTER,
-        s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER")
+      increaseFileCounter(currentPartitionValues, currentBucketId)
+    }
+    writeRecord(record)
+  }
+}
+
+/**
+ * Dynamic partition writer with concurrent writers, meaning multiple concurrent writers are opened
+ * for writing.
+ *
+ * The process has the following steps:
+ *  - Step 1: Maintain a map of output writers per each partition and/or bucket columns. Keep all
+ *            writers opened and write rows one by one.
+ *  - Step 2: If number of concurrent writers exceeds limit, sort rest of rows on partition and/or
+ *            bucket column(s). Write rows one by one, and eagerly close the writer when finishing
+ *            each partition and/or bucket.
+ *
+ * Caller is expected to call `writeWithIterator()` instead of `write()` to write records.
+ */
+class DynamicPartitionDataConcurrentWriter(
+    description: WriteJobDescription,
+    taskAttemptContext: TaskAttemptContext,
+    committer: FileCommitProtocol,
+    concurrentOutputWriterSpec: ConcurrentOutputWriterSpec)
+  extends BaseDynamicPartitionDataWriter(description, taskAttemptContext, committer) {
+
+  /** Wrapper class to index a unique concurrent output writer. */
+  private case class WriterIndex(
+    var partitionValues: Option[UnsafeRow],
+    var bucketId: Option[Int])
+
+  /** Wrapper class for status of a unique concurrent output writer. */
+  private class WriterStatus(
+    var outputWriter: OutputWriter,
+    var recordsInFile: Long,
+    var fileCounter: Int)
+
+  /**
+   * State to indicate if we are falling back to sort-based writer.
+   * Because we first try to use concurrent writers, its initial value is false.
+   */
+  private var sorted: Boolean = false
+  private val concurrentWriters = mutable.HashMap[WriterIndex, WriterStatus]()
+  private val currentWriterId = WriterIndex(None, None)
+
+  /**
+   * Release resources for all concurrent output writers.
+   */
+  override protected def releaseResources(): Unit = {
+    currentWriter = null
+    concurrentWriters.values.foreach(status => {
+      if (status.outputWriter != null) {
+        try {
+          status.outputWriter.close()
+        } finally {
+          status.outputWriter = null
+        }
+      }
+    })
+    concurrentWriters.clear()
+  }
 
-      newOutputWriter(currentPartitionValues, currentBucketId)
+  override def write(record: InternalRow): Unit = {
+    val nextPartitionValues = if (isPartitioned) Some(getPartitionValues(record)) else None
+    val nextBucketId = if (isBucketed) Some(getBucketId(record)) else None
+
+    if (currentWriterId.partitionValues != nextPartitionValues ||
+      currentWriterId.bucketId != nextBucketId) {
+      // See a new partition or bucket - write to a new partition dir (or a new bucket file).
+      if (currentWriter != null) {
+        if (!sorted) {
+          // Update writer status in concurrent writers map, because the writer is probably needed
+          // again later for writing other rows.
+          updateCurrentWriterStatusInMap()
+        } else {
+          // Remove writer status in concurrent writers map and release current writer resource,
+          // because the writer is not needed any more.
+          concurrentWriters.remove(currentWriterId)
+          releaseCurrentWriter()
+        }
+      }
+
+      if (isBucketed) {
+        currentWriterId.bucketId = nextBucketId
+      }
+      if (isPartitioned && currentWriterId.partitionValues != nextPartitionValues) {
+        currentWriterId.partitionValues = Some(nextPartitionValues.get.copy())
+        if (!concurrentWriters.contains(currentWriterId)) {
+          statsTrackers.foreach(_.newPartition(currentWriterId.partitionValues.get))
+        }
+      }
+      retrieveWriterInMap()
     }
-    val outputRow = getOutputRow(record)
-    currentWriter.write(outputRow)
-    statsTrackers.foreach(_.newRow(outputRow))
-    recordsInFile += 1
+
+    if (description.maxRecordsPerFile > 0 &&
+      recordsInFile >= description.maxRecordsPerFile) {
+      increaseFileCounter(currentWriterId.partitionValues, currentWriterId.bucketId)
+      // Update writer status in concurrent writers map, as a new writer is created.
+      updateCurrentWriterStatusInMap()
+    }
+    writeRecord(record)
+  }
+
+  /**
+   * Write iterator of records with concurrent writers.
+   */
+  override def writeWithIterator(iterator: Iterator[InternalRow]): Unit = {
+    while (iterator.hasNext && !sorted) {
+      write(iterator.next())
+    }
+
+    if (iterator.hasNext) {
+      clearCurrentWriterStatus()
+      val sorter = concurrentOutputWriterSpec.createSorter()
+      val sortIterator = sorter.sort(iterator.asInstanceOf[Iterator[UnsafeRow]])
+      while (sortIterator.hasNext) {
+        write(sortIterator.next())
+      }
+    }
+  }
+
+  /**
+   * Update current writer status in map.
+   */
+  private def updateCurrentWriterStatusInMap(): Unit = {
+    val status = concurrentWriters(currentWriterId)
+    status.outputWriter = currentWriter
+    status.recordsInFile = recordsInFile
+    status.fileCounter = fileCounter
+  }
+
+  /**
+   * Retrieve writer in map, or create a new writer if not exists.
+   */
+  private def retrieveWriterInMap(): Unit = {
+    if (concurrentWriters.contains(currentWriterId)) {
+      val status = concurrentWriters(currentWriterId)
+      currentWriter = status.outputWriter
+      recordsInFile = status.recordsInFile
+      fileCounter = status.fileCounter
+    } else {
+      fileCounter = 0
+      renewCurrentWriter(
+        currentWriterId.partitionValues,
+        currentWriterId.bucketId,
+        closeCurrentWriter = false)
+      concurrentWriters.put(

Review comment:
       Maybe add an assert before this (`assert(concurrentWriters.size <= concurrentOutputWriterSpec.maxWriters)`), since it depends on `sorted` and `concurrentWriters.remove` outside this function?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org