You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/02/18 14:13:05 UTC
[kyuubi] branch master updated: [KYUUBI #4359] Workaround for SPARK-41448 to keep FileWriterFactory serializable
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 4feb83d0f [KYUUBI #4359] Workaround for SPARK-41448 to keep FileWriterFactory serializable
4feb83d0f is described below
commit 4feb83d0f38f0eb3fd39ecf669772ba8d3780e99
Author: Yikf <yi...@apache.org>
AuthorDate: Sat Feb 18 22:12:53 2023 +0800
[KYUUBI #4359] Workaround for SPARK-41448 to keep FileWriterFactory serializable
### _Why are the changes needed?_
[SPARK-41448](https://issues.apache.org/jira/browse/SPARK-41448) make consistent MR job IDs in FileBatchWriter and FileFormatWriter in Apache Spark 3.3.2, but it breaks a serializable issue, JobId is non-serializable.
And this pr aims to rewrite `FileWriterFactory` to circumvent the problem
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4359 from Yikf/FileWriterFactory.
Closes #4359
dd8c90fe [Cheng Pan] Update extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala
1e5164ec [Yikf] Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory
Lead-authored-by: Yikf <yi...@apache.org>
Co-authored-by: Cheng Pan <pa...@gmail.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../connector/hive/write/FileWriterFactory.scala | 78 ++++++++++++++++++++++
.../connector/hive/write/HiveBatchWrite.scala | 9 ++-
.../spark/connector/hive/write/HiveWrite.scala | 4 +-
.../hive/kyuubi/connector/HiveBridgeHelper.scala | 2 +
4 files changed, 89 insertions(+), 4 deletions(-)
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala
new file mode 100644
index 000000000..c8e8f9b69
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive.write
+
+import java.util.Date
+
+import org.apache.hadoop.mapred.JobID
+import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.write.{DataWriter, DataWriterFactory}
+import org.apache.spark.sql.execution.datasources.{DynamicPartitionDataSingleWriter, SingleDirectoryDataWriter, WriteJobDescription}
+import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.sparkHadoopWriterUtils
+
+/**
+ * This class is rewritten because of SPARK-42478, which affects Spark 3.3.2
+ */
+case class FileWriterFactory(
+ description: WriteJobDescription,
+ committer: FileCommitProtocol) extends DataWriterFactory {
+
+ private val jobTrackerId = sparkHadoopWriterUtils.createJobTrackerID(new Date)
+
+ override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = {
+ val taskAttemptContext = createTaskAttemptContext(partitionId)
+ committer.setupTask(taskAttemptContext)
+ if (description.partitionColumns.isEmpty) {
+ new SingleDirectoryDataWriter(description, taskAttemptContext, committer)
+ } else {
+ new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer)
+ }
+ }
+
+ private def createTaskAttemptContext(partitionId: Int): TaskAttemptContextImpl = {
+ val jobId = createJobID(jobTrackerId, 0)
+ val taskId = new TaskID(jobId, TaskType.MAP, partitionId)
+ val taskAttemptId = new TaskAttemptID(taskId, 0)
+ // Set up the configuration object
+ val hadoopConf = description.serializableHadoopConf.value
+ hadoopConf.set("mapreduce.job.id", jobId.toString)
+ hadoopConf.set("mapreduce.task.id", taskId.toString)
+ hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString)
+ hadoopConf.setBoolean("mapreduce.task.ismap", true)
+ hadoopConf.setInt("mapreduce.task.partition", 0)
+
+ new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
+ }
+
+ /**
+ * Create a job ID.
+ *
+ * @param jobTrackerID unique job track id
+ * @param id job number
+ * @return a job ID
+ */
+ def createJobID(jobTrackerID: String, id: Int): JobID = {
+ if (id < 0) {
+ throw new IllegalArgumentException("Job number is negative")
+ }
+ new JobID(jobTrackerID, id)
+ }
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
index c4e473ff2..625d79d0c 100644
--- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
@@ -23,12 +23,13 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage}
import org.apache.spark.sql.execution.command.CommandUtils
-import org.apache.spark.sql.execution.datasources.WriteTaskResult
+import org.apache.spark.sql.execution.datasources.{WriteJobDescription, WriteTaskResult}
import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive, toSQLValue, HiveExternalCatalog}
import org.apache.spark.sql.types.StringType
@@ -47,10 +48,12 @@ class HiveBatchWrite(
ifPartitionNotExists: Boolean,
hadoopConf: Configuration,
fileBatchWrite: FileBatchWrite,
- externalCatalog: ExternalCatalog) extends BatchWrite with Logging {
+ externalCatalog: ExternalCatalog,
+ description: WriteJobDescription,
+ committer: FileCommitProtocol) extends BatchWrite with Logging {
override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = {
- fileBatchWrite.createBatchWriterFactory(info)
+ FileWriterFactory(description, committer)
}
override def commit(messages: Array[WriterCommitMessage]): Unit = {
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
index 486a7aa22..2d72327b4 100644
--- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
@@ -112,7 +112,9 @@ case class HiveWrite(
ifPartitionNotExists,
hadoopConf,
new FileBatchWrite(job, description, committer),
- externalCatalog)
+ externalCatalog,
+ description,
+ committer)
}
private def createWriteJobDescription(
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
index 88b0305dd..1a11790d8 100644
--- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.kyuubi.connector
import scala.collection.mutable
import org.apache.spark.SparkContext
+import org.apache.spark.internal.io.SparkHadoopWriterUtils
import org.apache.spark.rdd.InputFileBlockHolder
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, ExternalCatalogEvent}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Literal}
@@ -43,6 +44,7 @@ object HiveBridgeHelper {
val hive = org.apache.spark.sql.hive.client.hive
val logicalExpressions: LogicalExpressions.type = LogicalExpressions
val hiveClientImpl: HiveClientImpl.type = HiveClientImpl
+ val sparkHadoopWriterUtils: SparkHadoopWriterUtils.type = SparkHadoopWriterUtils
val catalogV2Util: CatalogV2Util.type = CatalogV2Util
val hiveTableUtil: HiveTableUtil.type = HiveTableUtil
val hiveShim: HiveShim.type = HiveShim