You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2020/08/10 01:55:59 UTC

[carbondata] branch master updated: [CARBONDATA-3940] Remove "spark.sql.sources.commitProtocolClass" configuration from SparkCarbonTableFormat

This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new e6850d5  [CARBONDATA-3940] Remove "spark.sql.sources.commitProtocolClass" configuration from SparkCarbonTableFormat
e6850d5 is described below

commit e6850d59773c237571405950a3f8aaa501d3e716
Author: haomarch <ma...@126.com>
AuthorDate: Thu Aug 6 15:01:18 2020 +0800

    [CARBONDATA-3940] Remove "spark.sql.sources.commitProtocolClass" configuration from SparkCarbonTableFormat
    
    Why is this PR needed?
    During the load process, commitTask fails with high probability.
    The exception stack shows that it was throwed by HadoopMapReduceCommitProtocol, not CarbonSQLHadoopMapMapReduceCommitProtocol,
    implying that there is class init error during the initializing of "Committer".
    It should have been initialized as CarbonSQLHadoopMapMapReduceCommitProtocol, but was incorrectly initialized to HadoopMapReduceCommitProtocol.
    
    What changes were proposed in this PR?
    Init the committer to be CarbonSQLHadoopMapMapReduceCommitProtocol directly
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3883
---
 .../management/CarbonInsertIntoHadoopFsRelationCommand.scala     | 9 +++------
 .../spark/sql/execution/datasources/SparkCarbonTableFormat.scala | 4 ----
 2 files changed, 3 insertions(+), 10 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoHadoopFsRelationCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoHadoopFsRelationCommand.scala
index b8253b9..eb19440 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoHadoopFsRelationCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoHadoopFsRelationCommand.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources.{FileFormat, FileFormatWriter, FileIndex, PartitioningUtils}
+import org.apache.spark.sql.execution.datasources.{CarbonSQLHadoopMapReduceCommitProtocol, FileFormat, FileFormatWriter, FileIndex, PartitioningUtils, SparkCarbonTableFormat}
 import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
 import org.apache.spark.sql.util.SchemaUtils
 
@@ -104,11 +104,8 @@ case class CarbonInsertIntoHadoopFsRelationCommand(
     val dynamicPartitionOverwrite = enableDynamicOverwrite && mode == SaveMode.Overwrite &&
                                     staticPartitions.size < partitionColumns.length
 
-    val committer = FileCommitProtocol.instantiate(
-      sparkSession.sessionState.conf.fileCommitProtocolClass,
-      jobId = java.util.UUID.randomUUID().toString,
-      outputPath = outputPath.toString,
-      dynamicPartitionOverwrite = dynamicPartitionOverwrite)
+    val committer = CarbonSQLHadoopMapReduceCommitProtocol(java.util.UUID.randomUUID().toString,
+      outputPath.toString, dynamicPartitionOverwrite)
 
     val doInsertion = (mode, pathExists) match {
       case (SaveMode.ErrorIfExists, true) =>
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index ecb8547..726523f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -70,10 +70,6 @@ with Serializable {
     None
   }
 
-  SparkSession.getActiveSession.get.sessionState.conf.setConfString(
-    "spark.sql.sources.commitProtocolClass",
-    "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol")
-
   override def prepareWrite(
       sparkSession: SparkSession,
       job: Job,