You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/08/09 18:26:22 UTC

[38/47] carbondata git commit: [CARBONDATA-2831] Added Support Merge index files read from non transactional table

[CARBONDATA-2831] Added Support Merge index files read from non transactional table

problem : Currently SDK read/ nontransactional table read from external
table gives null output when carbonMergeindex file is present instead of
carobnindex files.

cause : In LatestFileReadCommitted, while taking snapshot, merge index
files were not considered.

solution: consider the merge index files while taking snapshot

This closes #2610


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7a65ad1b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7a65ad1b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7a65ad1b

Branch: refs/heads/branch-1.4
Commit: 7a65ad1be896c744286b42f23a713b774daee8c9
Parents: 1f82d39
Author: ajantha-bhat <aj...@gmail.com>
Authored: Mon Aug 6 14:15:41 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Aug 9 23:51:17 2018 +0530

----------------------------------------------------------------------
 .../LatestFilesReadCommittedScope.java          | 10 ++++++---
 .../TestNonTransactionalCarbonTable.scala       | 22 +++++++++++++++++++-
 2 files changed, 28 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a65ad1b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
index f0f7272..fff9c6d 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
@@ -119,7 +119,11 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
       index = new LinkedList<>();
     }
     for (String indexPath : index) {
-      indexFileStore.put(indexPath, null);
+      if (indexPath.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        indexFileStore.put(indexPath, indexPath.substring(indexPath.lastIndexOf('/') + 1));
+      } else {
+        indexFileStore.put(indexPath, null);
+      }
     }
     return indexFileStore;
   }
@@ -171,9 +175,9 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
             "No Index files are present in the table location :" + carbonFilePath);
       }
       for (int i = 0; i < carbonIndexFiles.length; i++) {
-        // TODO. If Required to support merge index, then this code has to be modified.
         // TODO. Nested File Paths.
-        if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+        if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)
+            || carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
           // Get Segment Name from the IndexFile.
           String indexFilePath =
               FileFactory.getUpdatedFilePath(carbonIndexFiles[i].getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a65ad1b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index b92d41d..39f6ddc 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -33,7 +33,7 @@ import org.apache.avro.file.DataFileWriter
 import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
 import org.apache.avro.io.{DecoderFactory, Encoder}
 import org.apache.commons.io.FileUtils
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.junit.Assert
 import org.scalatest.BeforeAndAfterAll
@@ -1096,6 +1096,26 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     cleanTestData()
   }
 
+  test("test SDK Read with merge index file") {
+    sql("DROP TABLE IF EXISTS normalTable1")
+    sql(
+      "create table if not exists normalTable1(name string, age int, height double) STORED BY " +
+      "'carbondata'")
+    sql(s"""insert into normalTable1 values ("aaaaa", 12, 20)""").show(200, false)
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    val carbonTable = CarbonEnv
+      .getCarbonTable(Option("default"), "normalTable1")(sqlContext.sparkSession)
+    sql("describe formatted normalTable1").show(200, false)
+    val fileLocation = carbonTable.getSegmentPath("0")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$fileLocation' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("aaaaa", 12, 20)))
+    sql("DROP TABLE sdkOutputTable")
+    sql("DROP TABLE normalTable1")
+  }
+
+  // --------------------------------------------- AVRO test cases ---------------------------
   private def WriteFilesWithAvroWriter(rows: Int,
       mySchema: String,
       json: String) = {