You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/05/22 16:44:58 UTC

[16/50] [abbrv] carbondata git commit: [CARBONDATA-2463] concurrent insert requires separtate temp path which is differentiated with seg_id only

[CARBONDATA-2463] concurrent insert requires separtate temp path which is differentiated with seg_id only

issue : for non-transaction table we are not setting any segment-id in loadmodel .
temp location contains databasenaem_tablename_segId_taskid . during concurrent insert in same table,
temp folder has to be created at separate location . only segment_id value will change during two concurrent insert.

solution: insert flow for external table is same as carbon table., and we are not storing segment
info to anywhere. so dummy unique segment-id can be assigned to loadmodel for nontransactional table.

This closes #2284


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fc4b7f9b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fc4b7f9b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fc4b7f9b

Branch: refs/heads/branch-1.4
Commit: fc4b7f9b96eebdcaf6cd513d911a21761bbc5a77
Parents: 3d23fa6
Author: rahulforallp <ra...@knoldus.in>
Authored: Tue May 8 17:11:41 2018 +0530
Committer: kunal642 <ku...@gmail.com>
Committed: Tue May 15 13:17:15 2018 +0530

----------------------------------------------------------------------
 .../TestNonTransactionalCarbonTable.scala       | 43 +++++++++++++++++++-
 .../management/CarbonLoadDataCommand.scala      |  2 +
 .../sdk/file/CarbonWriterBuilder.java           |  4 +-
 .../sdk/file/CSVCarbonWriterTest.java           |  2 +-
 .../CSVNonTransactionalCarbonWriterTest.java    |  4 +-
 5 files changed, 49 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc4b7f9b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index fb9c862..5ab1c60 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -19,8 +19,10 @@ package org.apache.carbondata.spark.testsuite.createTable
 
 import java.sql.Timestamp
 import java.io.{File, FileFilter, IOException}
-import java.io.{File, FileFilter}
 import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.ExecutionContext.Implicits.global
 
 import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.Row
@@ -35,6 +37,8 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration.Duration
 
 import org.apache.avro
 import org.apache.commons.lang.CharEncoding
@@ -263,6 +267,43 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS sdkOutputTable")
   }
 
+  test("concurrently insert operation"){
+    cleanTestData()
+    buildTestDataSingleFile()
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    // with partition
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(name string) PARTITIONED BY (age int) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    sql("drop table if exists t1")
+    sql("create table if not exists t1 (name string, age int, height double) STORED BY 'org.apache.carbondata.format'")
+    var i =0;
+    while (i<50){
+      sql (s"""insert into t1 values ("aaaaa", 12, 20)""").show(200,false)
+      i = i+1;
+    }
+    checkAnswer(sql("select count(*) from t1"),Seq(Row(50)))
+    val one = Future {
+      sql("insert into sdkOutputTable select * from t1 ")
+    }
+    val two = Future {
+      sql("insert into sdkOutputTable select * from t1 ")
+    }
+
+    Await.result(Future.sequence(Seq(one, two)), Duration(300, TimeUnit.SECONDS))
+
+    checkAnswer(sql("select count(*) from sdkOutputTable"),Seq(Row(103)))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
   test(
     "Read two sdk writer outputs before and after deleting the existing files and creating new " +
     "files with same schema and UUID") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc4b7f9b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 3e306fb..3bef4b6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -272,6 +272,8 @@ case class CarbonLoadDataCommand(
           if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
             FileFactory.mkdirs(metadataDirectoryPath, fileType)
           }
+        } else {
+          carbonLoadModel.setSegmentId(System.currentTimeMillis().toString)
         }
         val partitionStatus = SegmentStatus.SUCCESS
         val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc4b7f9b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 1816539..a01f0d7 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -109,8 +109,8 @@ public class CarbonWriterBuilder {
    * by default it is system time in nano seconds.
    * @return updated CarbonWriterBuilder
    */
-  public CarbonWriterBuilder taskNo(String taskNo) {
-    this.taskNo = taskNo;
+  public CarbonWriterBuilder taskNo(long taskNo) {
+    this.taskNo = String.valueOf(taskNo);
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc4b7f9b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index ba3d3ac..d68d85b 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -257,7 +257,7 @@ public class CSVCarbonWriterTest {
     try {
       CarbonWriterBuilder builder = CarbonWriter.builder()
           .withSchema(new Schema(fields))
-          .isTransactionalTable(true).taskNo("5")
+          .isTransactionalTable(true).taskNo(5)
           .outputPath(path);
 
       CarbonWriter writer = builder.buildWriterForCSVInput();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc4b7f9b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
index 32fe6e8..19b0a42 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
@@ -96,7 +96,7 @@ public class CSVNonTransactionalCarbonWriterTest {
           .withSchema(schema)
           .isTransactionalTable(false)
           .uniqueIdentifier(System.currentTimeMillis())
-          .taskNo(Long.toString(System.nanoTime()))
+          .taskNo(System.nanoTime())
           .outputPath(path);
       if (sortColumns != null) {
         builder = builder.sortBy(sortColumns);
@@ -160,7 +160,7 @@ public class CSVNonTransactionalCarbonWriterTest {
           .withSchema(new Schema(fields))
           .uniqueIdentifier(System.currentTimeMillis())
           .isTransactionalTable(false)
-          .taskNo(Long.toString(System.nanoTime()))
+          .taskNo(System.nanoTime())
           .outputPath(path);
 
       CarbonWriter writer = builder.buildWriterForCSVInput();