You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2019/11/12 05:37:11 UTC

[carbondata] branch master updated: Fix for SDK filter queries not working when schema is given explicitly in Add Segment

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 86f12c8  Fix for SDK filter queries not working when schema is given explicitly in Add Segment
86f12c8 is described below

commit 86f12c8ea624e88b1d455b6e8f93474451f4a786
Author: manishnalla1994 <ma...@gmail.com>
AuthorDate: Wed Oct 30 18:13:35 2019 +0530

    Fix for SDK filter queries not working when schema is given explicitly in Add Segment
    
    Problem1 : Queries will not return correct result from added segment
    when the schema is given explicitly in case of SDK.
    
    Solution : Handled it by validating based on both column name and column
    id if it matches for the SDK column.
    
    Problem2 : While deleting added segment, the physical location is also
    getting deleted. Fixed that by adding validation.
    
    This closes #3427
---
 .../core/datastore/block/SegmentProperties.java    |  6 ++---
 .../core/metadata/schema/table/CarbonTable.java    |  2 +-
 .../metadata/schema/table/column/CarbonColumn.java |  6 +++++
 .../scan/executor/impl/AbstractQueryExecutor.java  |  2 +-
 .../core/scan/executor/util/RestructureUtil.java   | 19 +++++++++++---
 .../carbondata/core/scan/filter/FilterUtil.java    | 12 ++++-----
 .../executer/RowLevelFilterExecuterImpl.java       |  4 +--
 .../apache/carbondata/core/util/CarbonUtil.java    | 16 +++++++++---
 .../carbondata/core/util/DeleteLoadFolders.java    |  6 +++--
 .../testsuite/addsegment/AddSegmentTestCase.scala  | 30 ++++++++++++++++++----
 10 files changed, 75 insertions(+), 28 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
index c79ea12..ca9a6ee 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
@@ -628,11 +628,11 @@ public class SegmentProperties {
   /**
    * This method will search for a given measure in the current block measures list
    *
-   * @param columnId
+   * @param measureToBeSearched
    * @return
    */
-  public CarbonMeasure getMeasureFromCurrentBlock(String columnId) {
-    return CarbonUtil.getMeasureFromCurrentBlock(this.measures, columnId);
+  public CarbonMeasure getMeasureFromCurrentBlock(CarbonMeasure measureToBeSearched) {
+    return CarbonUtil.getMeasureFromCurrentBlock(this.measures, measureToBeSearched);
   }
 
   public int getNumberOfSortColumns() {
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index b36a60b..d0a6d1d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -1302,7 +1302,7 @@ public class CarbonTable implements Serializable, Writable {
         CarbonMeasure measure = getMeasureByName(tableName, measureColumn);
         if (null != measure) {
           CarbonMeasure measureFromCurrentBlock =
-              segmentProperties.getMeasureFromCurrentBlock(measure.getColumnId());
+              segmentProperties.getMeasureFromCurrentBlock(measure);
           if (null != measureFromCurrentBlock) {
             minMaxCachedColsList.add(measureFromCurrentBlock);
           }
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
index 809cc30..16ce5a5 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
@@ -206,4 +206,10 @@ public class CarbonColumn implements Serializable {
   public void setUseActualData(boolean useActualData) {
     this.useActualData = useActualData;
   }
+
+  public boolean isColmatchBasedOnId(CarbonColumn queryColumn) {
+    return this.getColName().equalsIgnoreCase(this.getColumnId()) && this.getColName()
+        .equalsIgnoreCase(queryColumn.getColName());
+  }
+
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 6760e77..b3128a7 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -716,7 +716,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
       Set<CarbonMeasure> updatedFilterMeasures = new HashSet<>(queryFilterMeasures.size());
       for (CarbonMeasure queryMeasure : queryFilterMeasures) {
         CarbonMeasure measureFromCurrentBlock =
-            segmentProperties.getMeasureFromCurrentBlock(queryMeasure.getColumnId());
+            segmentProperties.getMeasureFromCurrentBlock(queryMeasure);
         if (null != measureFromCurrentBlock) {
           updatedFilterMeasures.add(measureFromCurrentBlock);
         }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
index 0f93227..c88e259 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
@@ -167,7 +167,8 @@ public class RestructureUtil {
     // column ID but can have same column name
     if (tableColumn.getDataType().isComplexType() && !(tableColumn.getDataType().getId()
         == DataTypes.ARRAY_TYPE_ID)) {
-      if (tableColumn.getColumnId().equalsIgnoreCase(queryColumn.getColumnId())) {
+      if (tableColumn.getColumnId().equalsIgnoreCase(queryColumn.getColumnId()) || tableColumn
+          .isColmatchBasedOnId(queryColumn)) {
         return true;
       } else {
         return isColumnMatchesStruct(tableColumn, queryColumn);
@@ -175,7 +176,11 @@ public class RestructureUtil {
     } else {
       return (tableColumn.getColumnId().equalsIgnoreCase(queryColumn.getColumnId()) || (
           !isTransactionalTable && tableColumn.getColName()
-              .equalsIgnoreCase(queryColumn.getColName())));
+              .equalsIgnoreCase(queryColumn.getColName()))
+          // In case of SDK, columnId is same as columnName therefore the following check will
+          // ensure that if the table columnName is same as the query columnName and the table
+          // columnId is the same as table columnName then it's a valid columnName to be scanned.
+          || tableColumn.isColmatchBasedOnId(queryColumn));
     }
   }
 
@@ -200,8 +205,14 @@ public class RestructureUtil {
         }
         carbonDimension = CarbonTable.getCarbonDimension(tempColName.toString(), parentDimension);
         if (carbonDimension != null) {
+          // In case of SDK the columnId and columnName is same and this check will ensure for
+          // all the child columns that the table column name is equal to query column name and
+          // table columnId is equal to table columnName
           if (carbonDimension.getColumnSchema().getColumnUniqueId()
-              .equalsIgnoreCase(queryColumn.getColumnId())) {
+              .equalsIgnoreCase(queryColumn.getColumnId()) || (
+              carbonDimension.getColumnSchema().getColumnUniqueId()
+                  .equalsIgnoreCase(carbonDimension.getColName()) && carbonDimension.getColName()
+                  .equalsIgnoreCase(queryColumn.getColName()))) {
             return true;
           }
           if (carbonDimension.getListOfChildDimensions() != null) {
@@ -429,7 +440,7 @@ public class RestructureUtil {
       // then setting measure exists is true
       // otherwise adding a default value of a measure
       for (CarbonMeasure carbonMeasure : currentBlockMeasures) {
-        if (isColumnMatches(isTransactionalTable, carbonMeasure, queryMeasure.getMeasure())) {
+        if (isColumnMatches(isTransactionalTable, queryMeasure.getMeasure(), carbonMeasure)) {
           ProjectionMeasure currentBlockMeasure = new ProjectionMeasure(carbonMeasure);
           carbonMeasure.getColumnSchema().setDataType(queryMeasure.getMeasure().getDataType());
           carbonMeasure.getColumnSchema().setPrecision(queryMeasure.getMeasure().getPrecision());
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index 59d8c5a..cb9c930 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -241,8 +241,8 @@ public final class FilterUtil {
       MeasureColumnResolvedFilterInfo msrColResolvedFilterInfo,
       SegmentProperties segmentProperties) {
     if (null != msrColResolvedFilterInfo && msrColResolvedFilterInfo.getMeasure().isMeasure()) {
-      CarbonMeasure measuresFromCurrentBlock = segmentProperties
-          .getMeasureFromCurrentBlock(msrColResolvedFilterInfo.getMeasure().getColumnId());
+      CarbonMeasure measuresFromCurrentBlock =
+          segmentProperties.getMeasureFromCurrentBlock(msrColResolvedFilterInfo.getMeasure());
       if (null != measuresFromCurrentBlock) {
         // update dimension and column index according to the dimension position in current block
         MeasureColumnResolvedFilterInfo msrColResolvedFilterInfoCopyObject =
@@ -356,8 +356,8 @@ public final class FilterUtil {
     boolean replaceCurrentNodeWithTrueFilter = false;
     CarbonColumn columnFromCurrentBlock = null;
     if (isMeasure) {
-      columnFromCurrentBlock = segmentProperties
-          .getMeasureFromCurrentBlock(columnResolvedFilterInfo.getMeasure().getColumnId());
+      columnFromCurrentBlock =
+          segmentProperties.getMeasureFromCurrentBlock(columnResolvedFilterInfo.getMeasure());
     } else {
       columnFromCurrentBlock =
           segmentProperties.getDimensionFromCurrentBlock(columnResolvedFilterInfo.getDimension());
@@ -422,8 +422,8 @@ public final class FilterUtil {
       SegmentProperties segmentProperties) {
 
     if (null != msrColResolvedFilterInfo && msrColResolvedFilterInfo.getMeasure().isMeasure()) {
-      CarbonMeasure measuresFromCurrentBlock = segmentProperties
-          .getMeasureFromCurrentBlock(msrColResolvedFilterInfo.getMeasure().getColumnId());
+      CarbonMeasure measuresFromCurrentBlock =
+          segmentProperties.getMeasureFromCurrentBlock(msrColResolvedFilterInfo.getMeasure());
       if (null != measuresFromCurrentBlock) {
         // update dimension and column index according to the dimension position in current block
         MeasureColumnResolvedFilterInfo msrColResolvedFilterInfoCopyObject =
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
index 63ae0cd..9f1090e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
@@ -181,8 +181,8 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
   private void initMeasureChunkIndexes() {
     for (int i = 0; i < msrColEvalutorInfoList.size(); i++) {
       // find the measure in the current block measures list
-      CarbonMeasure measureFromCurrentBlock = segmentProperties.getMeasureFromCurrentBlock(
-          msrColEvalutorInfoList.get(i).getCarbonColumn().getColumnId());
+      CarbonMeasure measureFromCurrentBlock =
+          segmentProperties.getMeasureFromCurrentBlock(msrColEvalutorInfoList.get(i).getMeasure());
       if (null != measureFromCurrentBlock) {
         msrColEvalutorInfoList.get(i).setColumnIndex(measureFromCurrentBlock.getOrdinal());
         this.measureChunkIndex[i] = msrColEvalutorInfoList.get(i).getColumnIndexInMinMaxByteArray();
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 66618a2..141ed76 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1142,7 +1142,11 @@ public final class CarbonUtil {
       List<CarbonDimension> blockDimensions, CarbonDimension dimensionToBeSearched) {
     CarbonDimension currentBlockDimension = null;
     for (CarbonDimension blockDimension : blockDimensions) {
-      if (dimensionToBeSearched.getColumnId().equalsIgnoreCase(blockDimension.getColumnId())) {
+      // In case of SDK, columnId is same as columnName therefore the following check will
+      // ensure that if the dimensions columnName is same as the block columnName and the dimension
+      // columnId is the same as dimensions columnName then it's a valid column to be scanned.
+      if (dimensionToBeSearched.getColumnId().equalsIgnoreCase(blockDimension.getColumnId())
+          || blockDimension.isColmatchBasedOnId(dimensionToBeSearched)) {
         currentBlockDimension = blockDimension;
         break;
       }
@@ -1154,14 +1158,18 @@ public final class CarbonUtil {
    * This method will search for a given measure in the current block measures list
    *
    * @param blockMeasures
-   * @param columnId
+   * @param measureToBeSearched
    * @return
    */
   public static CarbonMeasure getMeasureFromCurrentBlock(List<CarbonMeasure> blockMeasures,
-      String columnId) {
+      CarbonMeasure measureToBeSearched) {
     CarbonMeasure currentBlockMeasure = null;
     for (CarbonMeasure blockMeasure : blockMeasures) {
-      if (columnId.equals(blockMeasure.getColumnId())) {
+      // In case of SDK, columnId is same as columnName therefore the following check will
+      // ensure that if the measures columnName is same as the block columnName and the measures
+      // columnId is the same as measures columnName then it's a valid column to be scanned.
+      if (measureToBeSearched.getColumnId().equalsIgnoreCase(blockMeasure.getColumnId())
+          || blockMeasure.isColmatchBasedOnId(measureToBeSearched)) {
         currentBlockMeasure = blockMeasure;
         break;
       }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
index 21c504b..a8e399b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
@@ -192,8 +192,10 @@ public final class DeleteLoadFolders {
 
   private static boolean checkIfLoadCanBeDeletedPhysically(LoadMetadataDetails oneLoad,
       boolean isForceDelete) {
-    if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() ||
-        SegmentStatus.COMPACTED == oneLoad.getSegmentStatus())) {
+    // Check if the segment is added externally and path is set then do not delete it
+    if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus()
+        || SegmentStatus.COMPACTED == oneLoad.getSegmentStatus()) && (oneLoad.getPath() == null
+        || oneLoad.getPath().equalsIgnoreCase("NA"))) {
       if (isForceDelete) {
         return true;
       }
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index 50407fc..129c0f0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -31,12 +31,13 @@ import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport
-import org.apache.carbondata.sdk.file.{CarbonReader, CarbonWriter}
+import org.apache.carbondata.sdk.file.{CarbonReader, CarbonWriter, Field, Schema}
 import org.junit.Assert
 import scala.io.Source
 
 import org.apache.carbondata.common.Strings
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.metadata.datatype.DataTypes
 
 class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
 
@@ -110,7 +111,8 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     sql("delete from table addsegment1 where segment.id in (2)")
     sql("clean files for table addsegment1")
     val oldFolder = FileFactory.getCarbonFile(newPath)
-    assert(oldFolder.listFiles.length == 0, "Added segment path should be deleted when clean files are called")
+    assert(oldFolder.listFiles.length == 2,
+      "Added segment path should not be deleted physically when clean files are called")
     FileFactory.deleteAllFilesOfDir(new File(newPath))
   }
 
@@ -145,7 +147,8 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
     sql("clean files for table addsegment1")
     val oldFolder = FileFactory.getCarbonFile(newPath)
-    assert(oldFolder.listFiles.length == 0, "Added segment path should be deleted when clean files are called")
+    assert(oldFolder.listFiles.length == 2,
+      "Added segment path should not be deleted physically when clean files are called")
     FileFactory.deleteAllFilesOfDir(new File(newPath))
   }
 
@@ -729,12 +732,28 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     val externalSegmentPath = storeLocation + "/" + "external_segment"
     FileFactory.deleteAllFilesOfDir(new File(externalSegmentPath))
 
+    var fields: Array[Field] = new Array[Field](14)
+    fields(0) = new Field("empno", DataTypes.INT)
+    fields(1) = new Field("empname", DataTypes.STRING)
+    fields(2) = new Field("designation", DataTypes.STRING)
+    fields(3) = new Field("doj", DataTypes.TIMESTAMP)
+    fields(4) = new Field("workgroupcategory", DataTypes.INT)
+    fields(5) = new Field("workgroupcategoryname", DataTypes.STRING)
+    fields(6) = new Field("deptno", DataTypes.INT)
+    fields(7) = new Field("deptname", DataTypes.STRING)
+    fields(8) = new Field("projectcode", DataTypes.INT)
+    fields(9) = new Field("projectjoindate", DataTypes.TIMESTAMP)
+    fields(10) = new Field("projectenddate", DataTypes.DATE)
+    fields(11) = new Field("attendance", DataTypes.INT)
+    fields(12) = new Field("utilization", DataTypes.INT)
+    fields(13) = new Field("salary", DataTypes.INT)
+
+
     // write into external segment folder
     val writer = CarbonWriter.builder
       .outputPath(externalSegmentPath)
-      .withSchemaFile(s"$storeLocation/$tableName/Metadata/schema")
       .writtenBy("AddSegmentTestCase")
-      .withCsvInput()
+      .withCsvInput(new Schema(fields))
       .build()
     val source = Source.fromFile(s"$resourcesPath/data.csv")
     var count = 0
@@ -748,6 +767,7 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
 
     sql(s"alter table $tableName add segment options('path'='$externalSegmentPath', 'format'='carbon')").show()
     checkAnswer(sql(s"select count(*) from $tableName"), Seq(Row(20)))
+    checkAnswer(sql(s"select count(*) from $tableName where empno = 11"), Seq(Row(2)))
     checkAnswer(sql(s"select sum(empno) from $tableName where empname = 'arvind' "), Seq(Row(22)))
     FileFactory.deleteAllFilesOfDir(new File(externalSegmentPath))
     sql(s"drop table $tableName")