You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/06/22 01:34:43 UTC

[42/50] [abbrv] carbondata git commit: [CARBONDATA-2617] Invalid tuple-id and block id getting formed for Non partition table

[CARBONDATA-2617] Invalid tuple-id and block id getting formed for Non partition table

Problem
Invalid tuple and block id getting formed for non partition table

Analysis
While creating a partition table a segment file was written in the Metadata folder under table structure. This was introduced during
development of partition table feature. At that time segment file was written only for partition table and it was used to distinguish between
parition and non partition table in the code. But later the code was modified to write the segment file for both parititon and non partition
table and the code to distinguish partition and non partition table was not modified which is causing this incorrect formation of block and tuple id.

Fix
Modify the logic to distinguish partitioned and non partitioned table and the same has been handled in this PR.

This closes #2385


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0e1d550e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0e1d550e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0e1d550e

Branch: refs/heads/carbonstore
Commit: 0e1d550e8dacba798e9ffbdda25c4388e8933632
Parents: dc53dee
Author: rahul <ra...@knoldus.in>
Authored: Tue Jun 19 19:23:26 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Wed Jun 20 16:37:23 2018 +0530

----------------------------------------------------------------------
 .../core/mutate/CarbonUpdateUtil.java           |  4 +-
 .../executor/impl/AbstractQueryExecutor.java    |  4 +-
 .../SegmentUpdateStatusManager.java             | 20 ++---
 .../apache/carbondata/core/util/CarbonUtil.java |  4 +-
 .../iud/DeleteCarbonTableTestCase.scala         | 83 ++++++++++++++++++++
 .../command/mutation/DeleteExecution.scala      |  6 +-
 6 files changed, 100 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e1d550e/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 40d498c..8627bdb 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -81,10 +81,10 @@ public class CarbonUpdateUtil {
   /**
    * Returns block path from tuple id
    */
-  public static String getTableBlockPath(String tid, String tablePath, boolean isSegmentFile) {
+  public static String getTableBlockPath(String tid, String tablePath, boolean isPartitionTable) {
     String partField = getRequiredFieldFromTID(tid, TupleIdEnum.PART_ID);
     // If it has segment file then partfield can be appended directly to table path
-    if (isSegmentFile) {
+    if (isPartitionTable) {
       return tablePath + CarbonCommonConstants.FILE_SEPARATOR + partField.replace("#", "/");
     }
     String part = CarbonTablePath.addPartPrefix(partField);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e1d550e/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 2bbe75c..f365045 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -296,8 +296,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     String blockId = CarbonUtil
         .getBlockId(queryModel.getAbsoluteTableIdentifier(), filePath, segment.getSegmentNo(),
             queryModel.getTable().getTableInfo().isTransactionalTable(),
-            segment.getSegmentFileName() != null);
-    if (segment.getSegmentFileName() != null) {
+            queryModel.getTable().isHivePartitionTable());
+    if (queryModel.getTable().isHivePartitionTable()) {
       blockExecutionInfo.setBlockId(CarbonTablePath.getShortBlockIdForPartitionTable(blockId));
     } else {
       blockExecutionInfo.setBlockId(CarbonTablePath.getShortBlockId(blockId));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e1d550e/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index eb850e4..1b43b65 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -69,6 +69,7 @@ public class SegmentUpdateStatusManager {
   private LoadMetadataDetails[] segmentDetails;
   private SegmentUpdateDetails[] updateDetails;
   private Map<String, SegmentUpdateDetails> blockAndDetailsMap;
+  private boolean isPartitionTable;
 
   public SegmentUpdateStatusManager(CarbonTable table,
       LoadMetadataDetails[] segmentDetails) {
@@ -77,6 +78,7 @@ public class SegmentUpdateStatusManager {
     // on latest file status.
     this.segmentDetails = segmentDetails;
     updateDetails = readLoadMetadata();
+    isPartitionTable = table.isHivePartitionTable();
     populateMap();
   }
 
@@ -96,6 +98,7 @@ public class SegmentUpdateStatusManager {
     } else {
       updateDetails = new SegmentUpdateDetails[0];
     }
+    isPartitionTable = table.isHivePartitionTable();
     populateMap();
   }
 
@@ -246,29 +249,22 @@ public class SegmentUpdateStatusManager {
    * @throws Exception
    */
   public String[] getDeleteDeltaFilePath(String blockFilePath, String segmentId) throws Exception {
-    String segmentFile = null;
-    for (LoadMetadataDetails segmentDetail : segmentDetails) {
-      if (segmentDetail.getLoadName().equals(segmentId)) {
-        segmentFile = segmentDetail.getSegmentFile();
-        break;
-      }
-    }
     String blockId =
-        CarbonUtil.getBlockId(identifier, blockFilePath, segmentId, true, segmentFile != null);
+        CarbonUtil.getBlockId(identifier, blockFilePath, segmentId, true, isPartitionTable);
     String tupleId;
-    if (segmentFile != null) {
+    if (isPartitionTable) {
       tupleId = CarbonTablePath.getShortBlockIdForPartitionTable(blockId);
     } else {
       tupleId = CarbonTablePath.getShortBlockId(blockId);
     }
-    return getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT, segmentFile)
+    return getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
         .toArray(new String[0]);
   }
 
   /**
    * Returns all delta file paths of specified block
    */
-  private List<String> getDeltaFiles(String tupleId, String extension, String segmentFile)
+  private List<String> getDeltaFiles(String tupleId, String extension)
       throws Exception {
     String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID);
     String completeBlockName = CarbonTablePath.addDataPartPrefix(
@@ -276,7 +272,7 @@ public class SegmentUpdateStatusManager {
             + CarbonCommonConstants.FACT_FILE_EXT);
 
     String blockPath;
-    if (segmentFile != null) {
+    if (isPartitionTable) {
       blockPath = identifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR
           + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.PART_ID)
           .replace("#", "/") + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e1d550e/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 1f6c697..4e2c16f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2977,13 +2977,13 @@ public final class CarbonUtil {
    * @return
    */
   public static String getBlockId(AbsoluteTableIdentifier identifier, String filePath,
-      String segmentId, boolean isTransactionalTable, boolean hasSegmentFile) {
+      String segmentId, boolean isTransactionalTable, boolean isPartitionTable) {
     String blockId;
     String blockName = filePath.substring(filePath.lastIndexOf("/") + 1, filePath.length());
     String tablePath = identifier.getTablePath();
 
     if (filePath.startsWith(tablePath)) {
-      if (!isTransactionalTable || !hasSegmentFile) {
+      if (!isTransactionalTable || !isPartitionTable) {
         blockId = "Part0" + CarbonCommonConstants.FILE_SEPARATOR + "Segment_" + segmentId
             + CarbonCommonConstants.FILE_SEPARATOR + blockName;
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e1d550e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index 1fbddb0..64aae1d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -18,12 +18,16 @@ package org.apache.carbondata.spark.testsuite.iud
 
 import java.io.File
 
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.test.Spark2TestQueryExecutor
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.{CarbonEnv, Row, SaveMode}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
 
@@ -120,6 +124,7 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
       Seq(Row(2), Row(3),Row(4), Row(5))
     )
   }
+
   test("partition delete data from  carbon table[where clause ]") {
     sql("""drop table if exists iud_db.dest""")
     sql("""create table iud_db.dest (c1 string,c2 int,c5 string) PARTITIONED BY(c3 string) STORED BY 'org.apache.carbondata.format'""")
@@ -214,6 +219,84 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("drop table update_status_files")
   }
 
+  test("tuple-id for partition table ") {
+    sql("drop table if exists iud_db.dest_tuple_part")
+    sql(
+      """create table iud_db.dest_tuple_part (c1 string,c2 int,c5 string) PARTITIONED BY(c3 string) STORED BY 'org.apache.carbondata.format'""".stripMargin)
+    sql(
+      s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud_db.dest_tuple_part""".stripMargin)
+    sql("drop table if exists iud_db.dest_tuple")
+    sql(
+      """create table iud_db.dest_tuple (c1 string,c2 int,c5 string,c3 string) STORED BY 'org.apache.carbondata.format'""".stripMargin)
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud_db.dest_tuple""")
+
+    val dataframe_part = sql("select getTupleId() as tupleId from iud_db.dest_tuple_part").collect()
+    val listOfTupleId_part = dataframe_part.map(df => df.get(0).toString).sorted
+    assert(listOfTupleId_part(0).startsWith("c3=aa/0/0-100100000100001_batchno0-0-0-") &&
+           listOfTupleId_part(0).endsWith("/0/0/0"))
+    assert(listOfTupleId_part(1).startsWith("c3=bb/0/0-100100000100002_batchno0-0-0-") &&
+           listOfTupleId_part(1).endsWith("/0/0/0"))
+    assert(listOfTupleId_part(2).startsWith("c3=cc/0/0-100100000100003_batchno0-0-0-") &&
+           listOfTupleId_part(2).endsWith("/0/0/0"))
+    assert(listOfTupleId_part(3).startsWith("c3=dd/0/0-100100000100004_batchno0-0-0-") &&
+           listOfTupleId_part(3).endsWith("/0/0/0"))
+    assert(listOfTupleId_part(4).startsWith("c3=ee/0/0-100100000100005_batchno0-0-0-") &&
+           listOfTupleId_part(4).endsWith("/0/0/0"))
+
+    val dataframe = sql("select getTupleId() as tupleId from iud_db.dest_tuple")
+    val listOfTupleId = dataframe.collect().map(df => df.get(0).toString).sorted
+    assert(
+      listOfTupleId(0).startsWith("0/0/0-0_batchno0-0-0-") && listOfTupleId(0).endsWith("/0/0/0"))
+    assert(
+      listOfTupleId(1).startsWith("0/0/0-0_batchno0-0-0-") && listOfTupleId(1).endsWith("/0/0/1"))
+    assert(
+      listOfTupleId(2).startsWith("0/0/0-0_batchno0-0-0-") && listOfTupleId(2).endsWith("/0/0/2"))
+    assert(
+      listOfTupleId(3).startsWith("0/0/0-0_batchno0-0-0-") && listOfTupleId(3).endsWith("/0/0/3"))
+    assert(
+      listOfTupleId(4).startsWith("0/0/0-0_batchno0-0-0-") && listOfTupleId(4).endsWith("/0/0/4"))
+
+    val carbonTable_part = CarbonEnv.getInstance(Spark2TestQueryExecutor.spark).carbonMetastore
+      .lookupRelation(Option("iud_db"), "dest_tuple_part")(Spark2TestQueryExecutor.spark)
+      .asInstanceOf[CarbonRelation].carbonTable
+
+    val carbonTable = CarbonEnv.getInstance(Spark2TestQueryExecutor.spark).carbonMetastore
+      .lookupRelation(Option("iud_db"), "dest_tuple")(Spark2TestQueryExecutor.spark)
+      .asInstanceOf[CarbonRelation].carbonTable
+
+    val carbonDataFilename = new File(carbonTable.getTablePath + "/Fact/Part0/Segment_0/")
+      .listFiles().filter(fn => fn.getName.endsWith(".carbondata"))
+    val blockId = CarbonUtil.getBlockId(carbonTable.getAbsoluteTableIdentifier,
+      carbonDataFilename(0).getAbsolutePath,
+      "0",
+      carbonTable.isTransactionalTable,
+      carbonTable.isHivePartitionTable)
+
+    assert(blockId.startsWith("Part0/Segment_0/part-0-0_batchno0-0-0-"))
+    val carbonDataFilename_part = new File(carbonTable_part.getTablePath + "/c3=aa").listFiles()
+      .filter(fn => fn.getName.endsWith(".carbondata"))
+    val blockId_part = CarbonUtil.getBlockId(carbonTable.getAbsoluteTableIdentifier,
+      carbonDataFilename_part(0).getAbsolutePath,
+      "0",
+      carbonTable.isTransactionalTable,
+      carbonTable.isHivePartitionTable)
+    assert(blockId_part.startsWith("Part0/Segment_0/part-0-100100000100001_batchno0-0-0-"))
+
+    val tableBlockPath = CarbonUpdateUtil
+      .getTableBlockPath(listOfTupleId(0),
+        carbonTable.getTablePath,
+        carbonTable.isHivePartitionTable)
+    val tableBl0ckPath_part = CarbonUpdateUtil
+      .getTableBlockPath(listOfTupleId_part(0),
+        carbonTable_part.getTablePath,
+        carbonTable_part.isHivePartitionTable)
+    assert(tableBl0ckPath_part.endsWith("iud_db.db/dest_tuple_part/c3=aa"))
+    assert(tableBlockPath.endsWith("iud_db.db/dest_tuple/Fact/Part0/Segment_0"))
+
+    sql("drop table if exists iud_db.dest_tuple_part")
+    sql("drop table if exists iud_db.dest_tuple")
+
+  }
 
   override def afterAll {
     sql("use default")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e1d550e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 127e1b1..df3b961 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -134,7 +134,7 @@ object DeleteExecution {
                        groupedRows.toIterator,
                        timestamp,
                        rowCountDetailsVO,
-                       segmentFile)
+                       carbonTable.isHivePartitionTable)
           }
           result
         }
@@ -222,7 +222,7 @@ object DeleteExecution {
         iter: Iterator[Row],
         timestamp: String,
         rowCountDetailsVO: RowCountDetailsVO,
-        segmentFile: String
+        isPartitionTable: Boolean
     ): Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))] = {
 
       val result = new DeleteDelataResultImpl()
@@ -258,7 +258,7 @@ object DeleteExecution {
             countOfRows = countOfRows + 1
           }
 
-          val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, tablePath, segmentFile != null)
+          val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, tablePath, isPartitionTable)
           val completeBlockName = CarbonTablePath
             .addDataPartPrefix(CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.BLOCK_ID) +
                                CarbonCommonConstants.FACT_FILE_EXT)