You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2019/11/12 05:37:11 UTC
[carbondata] branch master updated: Fix for SDK filter queries not
working when schema is given explicitly in Add Segment
This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 86f12c8 Fix for SDK filter queries not working when schema is given explicitly in Add Segment
86f12c8 is described below
commit 86f12c8ea624e88b1d455b6e8f93474451f4a786
Author: manishnalla1994 <ma...@gmail.com>
AuthorDate: Wed Oct 30 18:13:35 2019 +0530
Fix for SDK filter queries not working when schema is given explicitly in Add Segment
Problem1 : Queries will not return correct result from added segment
when the schema is given explicitly in case of SDK.
Solution : Handled it by validating based on both column name and column
id if it matches for the SDK column.
Problem2 : While deleting added segment, the physical location is also
getting deleted. Fixed that by adding validation.
This closes #3427
---
.../core/datastore/block/SegmentProperties.java | 6 ++---
.../core/metadata/schema/table/CarbonTable.java | 2 +-
.../metadata/schema/table/column/CarbonColumn.java | 6 +++++
.../scan/executor/impl/AbstractQueryExecutor.java | 2 +-
.../core/scan/executor/util/RestructureUtil.java | 19 +++++++++++---
.../carbondata/core/scan/filter/FilterUtil.java | 12 ++++-----
.../executer/RowLevelFilterExecuterImpl.java | 4 +--
.../apache/carbondata/core/util/CarbonUtil.java | 16 +++++++++---
.../carbondata/core/util/DeleteLoadFolders.java | 6 +++--
.../testsuite/addsegment/AddSegmentTestCase.scala | 30 ++++++++++++++++++----
10 files changed, 75 insertions(+), 28 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
index c79ea12..ca9a6ee 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
@@ -628,11 +628,11 @@ public class SegmentProperties {
/**
* This method will search for a given measure in the current block measures list
*
- * @param columnId
+ * @param measureToBeSearched
* @return
*/
- public CarbonMeasure getMeasureFromCurrentBlock(String columnId) {
- return CarbonUtil.getMeasureFromCurrentBlock(this.measures, columnId);
+ public CarbonMeasure getMeasureFromCurrentBlock(CarbonMeasure measureToBeSearched) {
+ return CarbonUtil.getMeasureFromCurrentBlock(this.measures, measureToBeSearched);
}
public int getNumberOfSortColumns() {
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index b36a60b..d0a6d1d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -1302,7 +1302,7 @@ public class CarbonTable implements Serializable, Writable {
CarbonMeasure measure = getMeasureByName(tableName, measureColumn);
if (null != measure) {
CarbonMeasure measureFromCurrentBlock =
- segmentProperties.getMeasureFromCurrentBlock(measure.getColumnId());
+ segmentProperties.getMeasureFromCurrentBlock(measure);
if (null != measureFromCurrentBlock) {
minMaxCachedColsList.add(measureFromCurrentBlock);
}
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
index 809cc30..16ce5a5 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
@@ -206,4 +206,10 @@ public class CarbonColumn implements Serializable {
public void setUseActualData(boolean useActualData) {
this.useActualData = useActualData;
}
+
+ public boolean isColmatchBasedOnId(CarbonColumn queryColumn) {
+ return this.getColName().equalsIgnoreCase(this.getColumnId()) && this.getColName()
+ .equalsIgnoreCase(queryColumn.getColName());
+ }
+
}
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 6760e77..b3128a7 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -716,7 +716,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
Set<CarbonMeasure> updatedFilterMeasures = new HashSet<>(queryFilterMeasures.size());
for (CarbonMeasure queryMeasure : queryFilterMeasures) {
CarbonMeasure measureFromCurrentBlock =
- segmentProperties.getMeasureFromCurrentBlock(queryMeasure.getColumnId());
+ segmentProperties.getMeasureFromCurrentBlock(queryMeasure);
if (null != measureFromCurrentBlock) {
updatedFilterMeasures.add(measureFromCurrentBlock);
}
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
index 0f93227..c88e259 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
@@ -167,7 +167,8 @@ public class RestructureUtil {
// column ID but can have same column name
if (tableColumn.getDataType().isComplexType() && !(tableColumn.getDataType().getId()
== DataTypes.ARRAY_TYPE_ID)) {
- if (tableColumn.getColumnId().equalsIgnoreCase(queryColumn.getColumnId())) {
+ if (tableColumn.getColumnId().equalsIgnoreCase(queryColumn.getColumnId()) || tableColumn
+ .isColmatchBasedOnId(queryColumn)) {
return true;
} else {
return isColumnMatchesStruct(tableColumn, queryColumn);
@@ -175,7 +176,11 @@ public class RestructureUtil {
} else {
return (tableColumn.getColumnId().equalsIgnoreCase(queryColumn.getColumnId()) || (
!isTransactionalTable && tableColumn.getColName()
- .equalsIgnoreCase(queryColumn.getColName())));
+ .equalsIgnoreCase(queryColumn.getColName()))
+ // In case of SDK, columnId is same as columnName therefore the following check will
+ // ensure that if the table columnName is same as the query columnName and the table
+ // columnId is the same as table columnName then it's a valid columnName to be scanned.
+ || tableColumn.isColmatchBasedOnId(queryColumn));
}
}
@@ -200,8 +205,14 @@ public class RestructureUtil {
}
carbonDimension = CarbonTable.getCarbonDimension(tempColName.toString(), parentDimension);
if (carbonDimension != null) {
+ // In case of SDK the columnId and columnName is same and this check will ensure for
+ // all the child columns that the table column name is equal to query column name and
+ // table columnId is equal to table columnName
if (carbonDimension.getColumnSchema().getColumnUniqueId()
- .equalsIgnoreCase(queryColumn.getColumnId())) {
+ .equalsIgnoreCase(queryColumn.getColumnId()) || (
+ carbonDimension.getColumnSchema().getColumnUniqueId()
+ .equalsIgnoreCase(carbonDimension.getColName()) && carbonDimension.getColName()
+ .equalsIgnoreCase(queryColumn.getColName()))) {
return true;
}
if (carbonDimension.getListOfChildDimensions() != null) {
@@ -429,7 +440,7 @@ public class RestructureUtil {
// then setting measure exists is true
// otherwise adding a default value of a measure
for (CarbonMeasure carbonMeasure : currentBlockMeasures) {
- if (isColumnMatches(isTransactionalTable, carbonMeasure, queryMeasure.getMeasure())) {
+ if (isColumnMatches(isTransactionalTable, queryMeasure.getMeasure(), carbonMeasure)) {
ProjectionMeasure currentBlockMeasure = new ProjectionMeasure(carbonMeasure);
carbonMeasure.getColumnSchema().setDataType(queryMeasure.getMeasure().getDataType());
carbonMeasure.getColumnSchema().setPrecision(queryMeasure.getMeasure().getPrecision());
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index 59d8c5a..cb9c930 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -241,8 +241,8 @@ public final class FilterUtil {
MeasureColumnResolvedFilterInfo msrColResolvedFilterInfo,
SegmentProperties segmentProperties) {
if (null != msrColResolvedFilterInfo && msrColResolvedFilterInfo.getMeasure().isMeasure()) {
- CarbonMeasure measuresFromCurrentBlock = segmentProperties
- .getMeasureFromCurrentBlock(msrColResolvedFilterInfo.getMeasure().getColumnId());
+ CarbonMeasure measuresFromCurrentBlock =
+ segmentProperties.getMeasureFromCurrentBlock(msrColResolvedFilterInfo.getMeasure());
if (null != measuresFromCurrentBlock) {
// update dimension and column index according to the dimension position in current block
MeasureColumnResolvedFilterInfo msrColResolvedFilterInfoCopyObject =
@@ -356,8 +356,8 @@ public final class FilterUtil {
boolean replaceCurrentNodeWithTrueFilter = false;
CarbonColumn columnFromCurrentBlock = null;
if (isMeasure) {
- columnFromCurrentBlock = segmentProperties
- .getMeasureFromCurrentBlock(columnResolvedFilterInfo.getMeasure().getColumnId());
+ columnFromCurrentBlock =
+ segmentProperties.getMeasureFromCurrentBlock(columnResolvedFilterInfo.getMeasure());
} else {
columnFromCurrentBlock =
segmentProperties.getDimensionFromCurrentBlock(columnResolvedFilterInfo.getDimension());
@@ -422,8 +422,8 @@ public final class FilterUtil {
SegmentProperties segmentProperties) {
if (null != msrColResolvedFilterInfo && msrColResolvedFilterInfo.getMeasure().isMeasure()) {
- CarbonMeasure measuresFromCurrentBlock = segmentProperties
- .getMeasureFromCurrentBlock(msrColResolvedFilterInfo.getMeasure().getColumnId());
+ CarbonMeasure measuresFromCurrentBlock =
+ segmentProperties.getMeasureFromCurrentBlock(msrColResolvedFilterInfo.getMeasure());
if (null != measuresFromCurrentBlock) {
// update dimension and column index according to the dimension position in current block
MeasureColumnResolvedFilterInfo msrColResolvedFilterInfoCopyObject =
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
index 63ae0cd..9f1090e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
@@ -181,8 +181,8 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
private void initMeasureChunkIndexes() {
for (int i = 0; i < msrColEvalutorInfoList.size(); i++) {
// find the measure in the current block measures list
- CarbonMeasure measureFromCurrentBlock = segmentProperties.getMeasureFromCurrentBlock(
- msrColEvalutorInfoList.get(i).getCarbonColumn().getColumnId());
+ CarbonMeasure measureFromCurrentBlock =
+ segmentProperties.getMeasureFromCurrentBlock(msrColEvalutorInfoList.get(i).getMeasure());
if (null != measureFromCurrentBlock) {
msrColEvalutorInfoList.get(i).setColumnIndex(measureFromCurrentBlock.getOrdinal());
this.measureChunkIndex[i] = msrColEvalutorInfoList.get(i).getColumnIndexInMinMaxByteArray();
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 66618a2..141ed76 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1142,7 +1142,11 @@ public final class CarbonUtil {
List<CarbonDimension> blockDimensions, CarbonDimension dimensionToBeSearched) {
CarbonDimension currentBlockDimension = null;
for (CarbonDimension blockDimension : blockDimensions) {
- if (dimensionToBeSearched.getColumnId().equalsIgnoreCase(blockDimension.getColumnId())) {
+ // In case of SDK, columnId is same as columnName therefore the following check will
+ // ensure that if the dimensions columnName is same as the block columnName and the dimension
+ // columnId is the same as dimensions columnName then it's a valid column to be scanned.
+ if (dimensionToBeSearched.getColumnId().equalsIgnoreCase(blockDimension.getColumnId())
+ || blockDimension.isColmatchBasedOnId(dimensionToBeSearched)) {
currentBlockDimension = blockDimension;
break;
}
@@ -1154,14 +1158,18 @@ public final class CarbonUtil {
* This method will search for a given measure in the current block measures list
*
* @param blockMeasures
- * @param columnId
+ * @param measureToBeSearched
* @return
*/
public static CarbonMeasure getMeasureFromCurrentBlock(List<CarbonMeasure> blockMeasures,
- String columnId) {
+ CarbonMeasure measureToBeSearched) {
CarbonMeasure currentBlockMeasure = null;
for (CarbonMeasure blockMeasure : blockMeasures) {
- if (columnId.equals(blockMeasure.getColumnId())) {
+ // In case of SDK, columnId is same as columnName therefore the following check will
+ // ensure that if the measures columnName is same as the block columnName and the measures
+ // columnId is the same as measures columnName then it's a valid column to be scanned.
+ if (measureToBeSearched.getColumnId().equalsIgnoreCase(blockMeasure.getColumnId())
+ || blockMeasure.isColmatchBasedOnId(measureToBeSearched)) {
currentBlockMeasure = blockMeasure;
break;
}
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
index 21c504b..a8e399b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
@@ -192,8 +192,10 @@ public final class DeleteLoadFolders {
private static boolean checkIfLoadCanBeDeletedPhysically(LoadMetadataDetails oneLoad,
boolean isForceDelete) {
- if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() ||
- SegmentStatus.COMPACTED == oneLoad.getSegmentStatus())) {
+ // Check if the segment is added externally and path is set then do not delete it
+ if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus()
+ || SegmentStatus.COMPACTED == oneLoad.getSegmentStatus()) && (oneLoad.getPath() == null
+ || oneLoad.getPath().equalsIgnoreCase("NA"))) {
if (isForceDelete) {
return true;
}
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index 50407fc..129c0f0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -31,12 +31,13 @@ import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport
-import org.apache.carbondata.sdk.file.{CarbonReader, CarbonWriter}
+import org.apache.carbondata.sdk.file.{CarbonReader, CarbonWriter, Field, Schema}
import org.junit.Assert
import scala.io.Source
import org.apache.carbondata.common.Strings
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.metadata.datatype.DataTypes
class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
@@ -110,7 +111,8 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
sql("delete from table addsegment1 where segment.id in (2)")
sql("clean files for table addsegment1")
val oldFolder = FileFactory.getCarbonFile(newPath)
- assert(oldFolder.listFiles.length == 0, "Added segment path should be deleted when clean files are called")
+ assert(oldFolder.listFiles.length == 2,
+ "Added segment path should not be deleted physically when clean files are called")
FileFactory.deleteAllFilesOfDir(new File(newPath))
}
@@ -145,7 +147,8 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
sql("clean files for table addsegment1")
val oldFolder = FileFactory.getCarbonFile(newPath)
- assert(oldFolder.listFiles.length == 0, "Added segment path should be deleted when clean files are called")
+ assert(oldFolder.listFiles.length == 2,
+ "Added segment path should not be deleted physically when clean files are called")
FileFactory.deleteAllFilesOfDir(new File(newPath))
}
@@ -729,12 +732,28 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
val externalSegmentPath = storeLocation + "/" + "external_segment"
FileFactory.deleteAllFilesOfDir(new File(externalSegmentPath))
+ var fields: Array[Field] = new Array[Field](14)
+ fields(0) = new Field("empno", DataTypes.INT)
+ fields(1) = new Field("empname", DataTypes.STRING)
+ fields(2) = new Field("designation", DataTypes.STRING)
+ fields(3) = new Field("doj", DataTypes.TIMESTAMP)
+ fields(4) = new Field("workgroupcategory", DataTypes.INT)
+ fields(5) = new Field("workgroupcategoryname", DataTypes.STRING)
+ fields(6) = new Field("deptno", DataTypes.INT)
+ fields(7) = new Field("deptname", DataTypes.STRING)
+ fields(8) = new Field("projectcode", DataTypes.INT)
+ fields(9) = new Field("projectjoindate", DataTypes.TIMESTAMP)
+ fields(10) = new Field("projectenddate", DataTypes.DATE)
+ fields(11) = new Field("attendance", DataTypes.INT)
+ fields(12) = new Field("utilization", DataTypes.INT)
+ fields(13) = new Field("salary", DataTypes.INT)
+
+
// write into external segment folder
val writer = CarbonWriter.builder
.outputPath(externalSegmentPath)
- .withSchemaFile(s"$storeLocation/$tableName/Metadata/schema")
.writtenBy("AddSegmentTestCase")
- .withCsvInput()
+ .withCsvInput(new Schema(fields))
.build()
val source = Source.fromFile(s"$resourcesPath/data.csv")
var count = 0
@@ -748,6 +767,7 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
sql(s"alter table $tableName add segment options('path'='$externalSegmentPath', 'format'='carbon')").show()
checkAnswer(sql(s"select count(*) from $tableName"), Seq(Row(20)))
+ checkAnswer(sql(s"select count(*) from $tableName where empno = 11"), Seq(Row(2)))
checkAnswer(sql(s"select sum(empno) from $tableName where empname = 'arvind' "), Seq(Row(22)))
FileFactory.deleteAllFilesOfDir(new File(externalSegmentPath))
sql(s"drop table $tableName")