You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/11/25 05:07:09 UTC

[carbondata] branch master updated: [CARBONDATA-4029] Fix oldTimeStamp issue in alter table add segment query.

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new d2df6d1  [CARBONDATA-4029] Fix oldTimeStamp issue in alter table add segment query.
d2df6d1 is described below

commit d2df6d1c03d1b9ced190ff265f55ac65b80de772
Author: Karan980 <ka...@gmail.com>
AuthorDate: Tue Nov 24 19:13:57 2020 +0530

    [CARBONDATA-4029] Fix oldTimeStamp issue in alter table add segment query.
    
    Why is this PR needed?
    Earlier timestamp present in name of carbondata files was in nanoseconds. Currently the timestamp is in milliseconds. When old SDK file segment is added to table through alter table add segment query then it is treated as invalid block due to timestamp present in nanoseconds.
    
    What changes were proposed in this PR?
    Removed update validation for SDK written files.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4024
---
 .../hadoop/api/CarbonTableInputFormat.java         | 20 ++++++++++-
 .../testsuite/addsegment/AddSegmentTestCase.scala  | 42 ++++++++++++++++++++++
 .../carbondata/sdk/file/CarbonWriterBuilder.java   |  4 ++-
 3 files changed, 64 insertions(+), 2 deletions(-)

diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index e3aacc0..a2c162e 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -66,6 +66,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 
 import com.google.common.collect.Sets;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -368,7 +369,10 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
       if (isIUDTable) {
         // In case IUD is not performed in this table avoid searching for
         // invalidated blocks.
-        if (CarbonUtil
+        String segmentId = getSegmentIdFromFilePath(inputSplit.getFilePath());
+        // SDK segments have segment id as null. If a segment has segmentId as null then no need
+        // to check updateValidation, because that segment is not created through update operation.
+        if (!segmentId.equalsIgnoreCase("null") && CarbonUtil
             .isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getFilePath(),
                 invalidBlockVOForSegmentId, updateStatusManager)) {
           continue;
@@ -572,4 +576,18 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
   public void setReadCommittedScope(ReadCommittedScope readCommittedScope) {
     this.readCommittedScope = readCommittedScope;
   }
+
+  public String getSegmentIdFromFilePath(String filePath) {
+    String tempFilePath = filePath.replace(
+        CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, CarbonCommonConstants.FILE_SEPARATOR);
+    int fileNameStartIndex = tempFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR);
+    String fileName = tempFilePath.substring(fileNameStartIndex);
+    if (fileName != null && !fileName.isEmpty()) {
+      String[] pathElements = fileName.split(CarbonCommonConstants.DASH);
+      if (ArrayUtils.isNotEmpty(pathElements)) {
+        return pathElements[pathElements.length - 2];
+      }
+    }
+    return CarbonCommonConstants.INVALID_SEGMENT_ID;
+  }
 }
\ No newline at end of file
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index ebb1efd..dddafd4 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -914,6 +914,48 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     sql(s"drop table $tableName")
   }
 
+  test("Test add segment by sdk written segment having timestamp in nanoseconds") {
+    val tableName = "add_segment_test"
+    sql(s"drop table if exists $tableName")
+    sql(
+      s"""
+         | CREATE TABLE $tableName (empno int, empname string, designation String, doj Timestamp,
+         | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+         | projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
+         | utilization int,salary int)
+         | STORED AS carbondata
+         |""".stripMargin)
+
+    val externalSegmentPath = storeLocation + "/" + "external_segment"
+    FileFactory.deleteAllFilesOfDir(new File(externalSegmentPath))
+
+    // write into external segment folder
+    val schemaFilePath = s"$storeLocation/$tableName/Metadata/schema"
+    val writer = CarbonWriter.builder
+      .outputPath(externalSegmentPath)
+      .withSchemaFile(schemaFilePath)
+      .uniqueIdentifier(System.nanoTime())
+      .writtenBy("AddSegmentTestCase")
+      .withCsvInput()
+      .build()
+    val source = Source.fromFile(s"$resourcesPath/data.csv")
+    var count = 0
+    for (line <- source.getLines()) {
+      if (count != 0) {
+        writer.write(line.split(","))
+      }
+      count = count + 1
+    }
+    writer.close()
+
+    sql(s"alter table $tableName add segment " +
+      s"options('path'='$externalSegmentPath', 'format'='carbon')").collect()
+    sql(s"delete from $tableName where empno = 12").collect()
+    checkAnswer(sql(s"select count(*) from $tableName"), Seq(Row(9)))
+    FileFactory.deleteAllFilesOfDir(new File(externalSegmentPath))
+    sql(s"drop table $tableName")
+  }
+
   def copy(oldLoc: String, newLoc: String): Unit = {
     val oldFolder = FileFactory.getCarbonFile(oldLoc)
     FileFactory.mkdirs(newLoc, FileFactory.getConfiguration)
diff --git a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 4d5ed07..fd7a5b4 100644
--- a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -930,7 +930,9 @@ public class CarbonWriterBuilder {
 
   public CarbonLoadModel buildLoadModel(Schema carbonSchema)
       throws IOException, InvalidLoadOptionException {
-    timestamp = System.currentTimeMillis();
+    if (timestamp == 0) {
+      timestamp = System.currentTimeMillis();
+    }
     // validate long_string_column
     Set<String> longStringColumns = new HashSet<>();
     if (options != null && options.get(CarbonCommonConstants.LONG_STRING_COLUMNS) != null) {