You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/02/18 14:13:25 UTC

[kyuubi] branch branch-1.7 updated: [KYUUBI #4359] Workaround for SPARK-41448 to keep FileWriterFactory serializable

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new c1f2ebdbc [KYUUBI #4359] Workaround for SPARK-41448 to keep FileWriterFactory serializable
c1f2ebdbc is described below

commit c1f2ebdbca4aa3005b12ef5ccdcac411fa85fd25
Author: Yikf <yi...@apache.org>
AuthorDate: Sat Feb 18 22:12:53 2023 +0800

    [KYUUBI #4359] Workaround for SPARK-41448 to keep FileWriterFactory serializable
    
    ### _Why are the changes needed?_
    
    [SPARK-41448](https://issues.apache.org/jira/browse/SPARK-41448) make consistent MR job IDs in FileBatchWriter and FileFormatWriter in Apache Spark 3.3.2, but it breaks a serializable issue, JobId is non-serializable.
    
    And this pr aims to rewrite `FileWriterFactory` to circumvent the problem
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4359 from Yikf/FileWriterFactory.
    
    Closes #4359
    
    dd8c90fe [Cheng Pan] Update extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala
    1e5164ec [Yikf] Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory
    
    Lead-authored-by: Yikf <yi...@apache.org>
    Co-authored-by: Cheng Pan <pa...@gmail.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
    (cherry picked from commit 4feb83d0f38f0eb3fd39ecf669772ba8d3780e99)
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../connector/hive/write/FileWriterFactory.scala   | 78 ++++++++++++++++++++++
 .../connector/hive/write/HiveBatchWrite.scala      |  9 ++-
 .../spark/connector/hive/write/HiveWrite.scala     |  4 +-
 .../hive/kyuubi/connector/HiveBridgeHelper.scala   |  2 +
 4 files changed, 89 insertions(+), 4 deletions(-)

diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala
new file mode 100644
index 000000000..c8e8f9b69
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive.write
+
+import java.util.Date
+
+import org.apache.hadoop.mapred.JobID
+import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.write.{DataWriter, DataWriterFactory}
+import org.apache.spark.sql.execution.datasources.{DynamicPartitionDataSingleWriter, SingleDirectoryDataWriter, WriteJobDescription}
+import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.sparkHadoopWriterUtils
+
+/**
+ * This class is rewritten because of SPARK-42478, which affects Spark 3.3.2
+ */
+case class FileWriterFactory(
+    description: WriteJobDescription,
+    committer: FileCommitProtocol) extends DataWriterFactory {
+
+  private val jobTrackerId = sparkHadoopWriterUtils.createJobTrackerID(new Date)
+
+  override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = {
+    val taskAttemptContext = createTaskAttemptContext(partitionId)
+    committer.setupTask(taskAttemptContext)
+    if (description.partitionColumns.isEmpty) {
+      new SingleDirectoryDataWriter(description, taskAttemptContext, committer)
+    } else {
+      new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer)
+    }
+  }
+
+  private def createTaskAttemptContext(partitionId: Int): TaskAttemptContextImpl = {
+    val jobId = createJobID(jobTrackerId, 0)
+    val taskId = new TaskID(jobId, TaskType.MAP, partitionId)
+    val taskAttemptId = new TaskAttemptID(taskId, 0)
+    // Set up the configuration object
+    val hadoopConf = description.serializableHadoopConf.value
+    hadoopConf.set("mapreduce.job.id", jobId.toString)
+    hadoopConf.set("mapreduce.task.id", taskId.toString)
+    hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString)
+    hadoopConf.setBoolean("mapreduce.task.ismap", true)
+    hadoopConf.setInt("mapreduce.task.partition", 0)
+
+    new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
+  }
+
+  /**
+   * Create a job ID.
+   *
+   * @param jobTrackerID unique job track id
+   * @param id job number
+   * @return a job ID
+   */
+  def createJobID(jobTrackerID: String, id: Int): JobID = {
+    if (id < 0) {
+      throw new IllegalArgumentException("Job number is negative")
+    }
+    new JobID(jobTrackerID, id)
+  }
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
index c4e473ff2..625d79d0c 100644
--- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
@@ -23,12 +23,13 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage}
 import org.apache.spark.sql.execution.command.CommandUtils
-import org.apache.spark.sql.execution.datasources.WriteTaskResult
+import org.apache.spark.sql.execution.datasources.{WriteJobDescription, WriteTaskResult}
 import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite
 import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive, toSQLValue, HiveExternalCatalog}
 import org.apache.spark.sql.types.StringType
@@ -47,10 +48,12 @@ class HiveBatchWrite(
     ifPartitionNotExists: Boolean,
     hadoopConf: Configuration,
     fileBatchWrite: FileBatchWrite,
-    externalCatalog: ExternalCatalog) extends BatchWrite with Logging {
+    externalCatalog: ExternalCatalog,
+    description: WriteJobDescription,
+    committer: FileCommitProtocol) extends BatchWrite with Logging {
 
   override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = {
-    fileBatchWrite.createBatchWriterFactory(info)
+    FileWriterFactory(description, committer)
   }
 
   override def commit(messages: Array[WriterCommitMessage]): Unit = {
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
index 486a7aa22..2d72327b4 100644
--- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
@@ -112,7 +112,9 @@ case class HiveWrite(
       ifPartitionNotExists,
       hadoopConf,
       new FileBatchWrite(job, description, committer),
-      externalCatalog)
+      externalCatalog,
+      description,
+      committer)
   }
 
   private def createWriteJobDescription(
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
index 88b0305dd..1a11790d8 100644
--- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.kyuubi.connector
 import scala.collection.mutable
 
 import org.apache.spark.SparkContext
+import org.apache.spark.internal.io.SparkHadoopWriterUtils
 import org.apache.spark.rdd.InputFileBlockHolder
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, ExternalCatalogEvent}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Literal}
@@ -43,6 +44,7 @@ object HiveBridgeHelper {
   val hive = org.apache.spark.sql.hive.client.hive
   val logicalExpressions: LogicalExpressions.type = LogicalExpressions
   val hiveClientImpl: HiveClientImpl.type = HiveClientImpl
+  val sparkHadoopWriterUtils: SparkHadoopWriterUtils.type = SparkHadoopWriterUtils
   val catalogV2Util: CatalogV2Util.type = CatalogV2Util
   val hiveTableUtil: HiveTableUtil.type = HiveTableUtil
   val hiveShim: HiveShim.type = HiveShim