You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/05/15 07:49:57 UTC
carbondata git commit: [CARBONDATA-2463] concurrent insert requires
separtate temp path which is differentiated with seg_id only
Repository: carbondata
Updated Branches:
refs/heads/master 3d23fa693 -> fc4b7f9b9
[CARBONDATA-2463] concurrent insert requires separtate temp path which is differentiated with seg_id only
issue : for non-transaction table we are not setting any segment-id in loadmodel .
temp location contains databasenaem_tablename_segId_taskid . during concurrent insert in same table,
temp folder has to be created at separate location . only segment_id value will change during two concurrent insert.
solution: insert flow for external table is same as carbon table., and we are not storing segment
info to anywhere. so dummy unique segment-id can be assigned to loadmodel for nontransactional table.
This closes #2284
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fc4b7f9b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fc4b7f9b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fc4b7f9b
Branch: refs/heads/master
Commit: fc4b7f9b96eebdcaf6cd513d911a21761bbc5a77
Parents: 3d23fa6
Author: rahulforallp <ra...@knoldus.in>
Authored: Tue May 8 17:11:41 2018 +0530
Committer: kunal642 <ku...@gmail.com>
Committed: Tue May 15 13:17:15 2018 +0530
----------------------------------------------------------------------
.../TestNonTransactionalCarbonTable.scala | 43 +++++++++++++++++++-
.../management/CarbonLoadDataCommand.scala | 2 +
.../sdk/file/CarbonWriterBuilder.java | 4 +-
.../sdk/file/CSVCarbonWriterTest.java | 2 +-
.../CSVNonTransactionalCarbonWriterTest.java | 4 +-
5 files changed, 49 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc4b7f9b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index fb9c862..5ab1c60 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -19,8 +19,10 @@ package org.apache.carbondata.spark.testsuite.createTable
import java.sql.Timestamp
import java.io.{File, FileFilter, IOException}
-import java.io.{File, FileFilter}
import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.ExecutionContext.Implicits.global
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.Row
@@ -35,6 +37,8 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import scala.collection.JavaConverters._
import scala.collection.mutable
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration.Duration
import org.apache.avro
import org.apache.commons.lang.CharEncoding
@@ -263,6 +267,43 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS sdkOutputTable")
}
+ test("concurrently insert operation"){
+ cleanTestData()
+ buildTestDataSingleFile()
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ // with partition
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable(name string) PARTITIONED BY (age int) STORED BY
+ |'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ sql("drop table if exists t1")
+ sql("create table if not exists t1 (name string, age int, height double) STORED BY 'org.apache.carbondata.format'")
+ var i =0;
+ while (i<50){
+ sql (s"""insert into t1 values ("aaaaa", 12, 20)""").show(200,false)
+ i = i+1;
+ }
+ checkAnswer(sql("select count(*) from t1"),Seq(Row(50)))
+ val one = Future {
+ sql("insert into sdkOutputTable select * from t1 ")
+ }
+ val two = Future {
+ sql("insert into sdkOutputTable select * from t1 ")
+ }
+
+ Await.result(Future.sequence(Seq(one, two)), Duration(300, TimeUnit.SECONDS))
+
+ checkAnswer(sql("select count(*) from sdkOutputTable"),Seq(Row(103)))
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
test(
"Read two sdk writer outputs before and after deleting the existing files and creating new " +
"files with same schema and UUID") {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc4b7f9b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 3e306fb..3bef4b6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -272,6 +272,8 @@ case class CarbonLoadDataCommand(
if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
FileFactory.mkdirs(metadataDirectoryPath, fileType)
}
+ } else {
+ carbonLoadModel.setSegmentId(System.currentTimeMillis().toString)
}
val partitionStatus = SegmentStatus.SUCCESS
val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc4b7f9b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 1816539..a01f0d7 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -109,8 +109,8 @@ public class CarbonWriterBuilder {
* by default it is system time in nano seconds.
* @return updated CarbonWriterBuilder
*/
- public CarbonWriterBuilder taskNo(String taskNo) {
- this.taskNo = taskNo;
+ public CarbonWriterBuilder taskNo(long taskNo) {
+ this.taskNo = String.valueOf(taskNo);
return this;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc4b7f9b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index ba3d3ac..d68d85b 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -257,7 +257,7 @@ public class CSVCarbonWriterTest {
try {
CarbonWriterBuilder builder = CarbonWriter.builder()
.withSchema(new Schema(fields))
- .isTransactionalTable(true).taskNo("5")
+ .isTransactionalTable(true).taskNo(5)
.outputPath(path);
CarbonWriter writer = builder.buildWriterForCSVInput();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc4b7f9b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
index 32fe6e8..19b0a42 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
@@ -96,7 +96,7 @@ public class CSVNonTransactionalCarbonWriterTest {
.withSchema(schema)
.isTransactionalTable(false)
.uniqueIdentifier(System.currentTimeMillis())
- .taskNo(Long.toString(System.nanoTime()))
+ .taskNo(System.nanoTime())
.outputPath(path);
if (sortColumns != null) {
builder = builder.sortBy(sortColumns);
@@ -160,7 +160,7 @@ public class CSVNonTransactionalCarbonWriterTest {
.withSchema(new Schema(fields))
.uniqueIdentifier(System.currentTimeMillis())
.isTransactionalTable(false)
- .taskNo(Long.toString(System.nanoTime()))
+ .taskNo(System.nanoTime())
.outputPath(path);
CarbonWriter writer = builder.buildWriterForCSVInput();