You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/11/02 13:00:08 UTC

carbondata git commit: [CARBONDATA-3070] Fix partition load issue when custom location is added.

Repository: carbondata
Updated Branches:
  refs/heads/master 74a2ddee9 -> d62277696


[CARBONDATA-3070] Fix partition load issue when custom location is added.

Problem:
Load files from carbonfile format when custom partition location is added

Reason:
Carbon has its own filename for each carbondata file, it does not use the filename proposed by spark.
And also it has extra index file need to be created. In case of custom partition location sparks keep track the files
of name which creates and move them. But carbon has different files created and maintained, that creates the filenot found exception.

Solution:
Use custom protocol to manage commit and folder location for custom partition location.

This closes #2873


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d6227769
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d6227769
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d6227769

Branch: refs/heads/master
Commit: d62277696cd19257a50cc956e3e7ff8fad5e651f
Parents: 74a2dde
Author: ravipesala <ra...@gmail.com>
Authored: Mon Oct 29 13:15:00 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Fri Nov 2 18:29:46 2018 +0530

----------------------------------------------------------------------
 .../datasources/SparkCarbonFileFormat.scala     | 87 +++++++++++++++++++-
 .../org/apache/spark/sql/CarbonVectorProxy.java |  3 +
 .../datasource/SparkCarbonDataSourceTest.scala  | 34 ++++++++
 3 files changed, 120 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d6227769/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index cd2035c..8c2f200 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.carbondata.execution.datasources
 
+import java.net.URI
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
@@ -27,6 +29,7 @@ import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.memory.MemoryMode
 import org.apache.spark.sql._
 import org.apache.spark.sql.carbondata.execution.datasources.readsupport.SparkUnsafeRowReadSuport
@@ -112,6 +115,13 @@ class SparkCarbonFileFormat extends FileFormat
   }
 
   /**
+   * Add our own protocol to control the commit.
+   */
+  SparkSession.getActiveSession.get.sessionState.conf.setConfString(
+    "spark.sql.sources.commitProtocolClass",
+    "org.apache.spark.sql.carbondata.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol")
+
+  /**
    * Prepares a write job and returns an [[OutputWriterFactory]].  Client side job preparation is
    * done here.
    */
@@ -125,6 +135,7 @@ class SparkCarbonFileFormat extends FileFormat
     val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
     model.setLoadWithoutConverterStep(true)
     CarbonTableOutputFormat.setLoadModel(conf, model)
+    conf.set(CarbonSQLHadoopMapReduceCommitProtocol.COMMIT_PROTOCOL, "true")
 
     new OutputWriterFactory {
       override def newInstance(
@@ -310,7 +321,6 @@ class SparkCarbonFileFormat extends FileFormat
     vectorizedReader.toBoolean && schema.forall(_.dataType.isInstanceOf[AtomicType])
   }
 
-
   /**
    * Returns whether this format support returning columnar batch or not.
    */
@@ -369,7 +379,7 @@ class SparkCarbonFileFormat extends FileFormat
 
       if (file.filePath.endsWith(CarbonTablePath.CARBON_DATA_EXT)) {
         val split = new CarbonInputSplit("null",
-          new Path(file.filePath),
+          new Path(new URI(file.filePath)),
           file.start,
           file.length,
           file.locations,
@@ -380,10 +390,12 @@ class SparkCarbonFileFormat extends FileFormat
         split.setDetailInfo(info)
         info.setBlockSize(file.length)
         // Read the footer offset and set.
-        val reader = FileFactory.getFileHolder(FileFactory.getFileType(file.filePath),
+        val reader = FileFactory.getFileHolder(FileFactory.getFileType(split.getPath.toString),
           broadcastedHadoopConf.value.value)
         val buffer = reader
-          .readByteBuffer(FileFactory.getUpdatedFilePath(file.filePath), file.length - 8, 8)
+          .readByteBuffer(FileFactory.getUpdatedFilePath(split.getPath.toString),
+            file.length - 8,
+            8)
         info.setBlockFooterOffset(buffer.getLong)
         info.setVersionNumber(split.getVersion.number())
         info.setUseMinMaxForPruning(true)
@@ -447,7 +459,74 @@ class SparkCarbonFileFormat extends FileFormat
     }
   }
 
+}
+
+/**
+ * Since carbon writes 2 files carbondata files and index file , but spark cannot understand two
+ * files so added custom protocol to copy the files in case of custom partition location.
+ */
+case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Boolean)
+  extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend) {
+
+  override def newTaskTempFileAbsPath(
+      taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
+    val carbonFlow = taskContext.getConfiguration.get(
+      CarbonSQLHadoopMapReduceCommitProtocol.COMMIT_PROTOCOL)
+    val tempPath = super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext)
+    // Call only in case of carbon flow.
+    if (carbonFlow != null) {
+      // Create subfolder with uuid and write carbondata files
+      val path = new Path(tempPath)
+      val uuid = path.getName.substring(0, path.getName.indexOf("-part-"))
+      new Path(new Path(path.getParent, uuid), path.getName).toString
+    } else {
+      tempPath
+    }
+  }
 
+  override def commitJob(jobContext: JobContext,
+      taskCommits: Seq[FileCommitProtocol.TaskCommitMessage]): Unit = {
+    val carbonFlow = jobContext.getConfiguration.get(
+      CarbonSQLHadoopMapReduceCommitProtocol.COMMIT_PROTOCOL)
+    var updatedTaskCommits = taskCommits
+    // Call only in case of carbon flow.
+    if (carbonFlow != null) {
+      val (allAbsPathFiles, allPartitionPaths) =
+        // spark 2.1 and 2.2 case
+        if (taskCommits.exists(_.obj.isInstanceOf[Map[String, String]])) {
+        (taskCommits.map(_.obj.asInstanceOf[Map[String, String]]), null)
+      } else {
+          // spark 2.3 and above
+        taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip
+      }
+      val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _)
+      val fs = new Path(path).getFileSystem(jobContext.getConfiguration)
+      // Move files from stage directory to actual location.
+      filesToMove.foreach{case (src, dest) =>
+        val srcPath = new Path(src)
+        val name = srcPath.getName
+        // Get uuid from spark's stage filename
+        val uuid = name.substring(0, name.indexOf("-part-"))
+        // List all the files under the uuid location
+        val list = fs.listStatus(new Path(new Path(src).getParent, uuid))
+        // Move all these files to actual folder.
+        list.foreach{ f =>
+          fs.rename(f.getPath, new Path(new Path(dest).getParent, f.getPath.getName))
+        }
+      }
+      updatedTaskCommits = if (allPartitionPaths == null) {
+        taskCommits.map(f => new FileCommitProtocol.TaskCommitMessage(Map.empty))
+      } else {
+        taskCommits.zipWithIndex.map{f =>
+          new FileCommitProtocol.TaskCommitMessage((Map.empty, allPartitionPaths(f._2)))
+        }
+      }
+    }
+    super.commitJob(jobContext, updatedTaskCommits)
+  }
+}
+object CarbonSQLHadoopMapReduceCommitProtocol {
+  val COMMIT_PROTOCOL = "carbon.commit.protocol"
 }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d6227769/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
index c16d381..90e2cc5 100644
--- a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
+++ b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
@@ -464,6 +464,9 @@ public class CarbonVectorProxy {
     }
 
     public void reset() {
+      if (isConstant) {
+        return;
+      }
       isLoaded = false;
       vector.reset();
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d6227769/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index a1a5b8e..7564158 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -1276,6 +1276,40 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
     }
   }
 
+  test("test partition issue with add location") {
+    spark.sql("drop table if exists partitionTable_obs")
+    spark.sql("drop table if exists partitionTable_obs_par")
+    spark.sql(s"create table partitionTable_obs (id int,name String,email String) using carbon partitioned by(email) ")
+    spark.sql(s"create table partitionTable_obs_par (id int,name String,email String) using parquet partitioned by(email) ")
+    spark.sql("insert into partitionTable_obs select 1,'huawei','abc'")
+    spark.sql("insert into partitionTable_obs select 1,'huawei','bcd'")
+    spark.sql(s"alter table partitionTable_obs add partition (email='def') location '$warehouse1/test_folder121/'")
+    spark.sql("insert into partitionTable_obs select 1,'huawei','def'")
+
+    spark.sql("insert into partitionTable_obs_par select 1,'huawei','abc'")
+    spark.sql("insert into partitionTable_obs_par select 1,'huawei','bcd'")
+    spark.sql(s"alter table partitionTable_obs_par add partition (email='def') location '$warehouse1/test_folder122/'")
+    spark.sql("insert into partitionTable_obs_par select 1,'huawei','def'")
+
+    checkAnswer(spark.sql("select * from partitionTable_obs"), spark.sql("select * from partitionTable_obs_par"))
+    spark.sql("drop table if exists partitionTable_obs")
+    spark.sql("drop table if exists partitionTable_obs_par")
+  }
+
+  test("test multiple partition  select issue") {
+    spark.sql("drop table if exists t_carbn01b_hive")
+    spark.sql(s"drop table if exists t_carbn01b")
+    spark.sql("create table t_carbn01b_hive(Qty_day_avg INT,Qty_total INT,Sell_price BIGINT,Sell_pricep DOUBLE,Profit DECIMAL(3,2),Item_code String,Item_name String,Outlet_name String,Create_date String,Active_status String,Item_type_cd INT, Update_time TIMESTAMP, Discount_price DOUBLE)  using parquet partitioned by (Active_status,Item_type_cd, Update_time, Discount_price)")
+    spark.sql("create table t_carbn01b(Qty_day_avg INT,Qty_total INT,Sell_price BIGINT,Sell_pricep DOUBLE,Profit DECIMAL(3,2),Item_code String,Item_name String,Outlet_name String,Create_date String,Active_status String,Item_type_cd INT, Update_time TIMESTAMP, Discount_price DOUBLE)  using carbon partitioned by (Active_status,Item_type_cd, Update_time, Discount_price)")
+    spark.sql("insert into t_carbn01b partition(Active_status, Item_type_cd,Update_time,Discount_price) select * from t_carbn01b_hive")
+    spark.sql("alter table t_carbn01b add partition (active_status='xyz',Item_type_cd=12,Update_time=NULL,Discount_price='3000')")
+    spark.sql("insert overwrite table t_carbn01b select 'xyz', 12, 74,3000,20000000,121.5,4.99,2.44,'RE3423ee','dddd', 'ssss','2012-01-02 23:04:05.12', '2012-01-20'")
+    spark.sql("insert overwrite table t_carbn01b_hive select 'xyz', 12, 74,3000,20000000,121.5,4.99,2.44,'RE3423ee','dddd', 'ssss','2012-01-02 23:04:05.12', '2012-01-20'")
+    checkAnswer(spark.sql("select * from t_carbn01b_hive"), spark.sql("select * from t_carbn01b"))
+    spark.sql("drop table if exists t_carbn01b_hive")
+    spark.sql(s"drop table if exists t_carbn01b")
+  }
+
   override protected def beforeAll(): Unit = {
     drop
     createParquetTable