You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/03 19:30:56 UTC
carbondata git commit: [CARBONDATA-1454]false expression handling and
block pruning
Repository: carbondata
Updated Branches:
refs/heads/master fa6cd8d58 -> e16e87818
[CARBONDATA-1454]false expression handling and block pruning
Issue :- In case of wrong value/invalid for time-stamp and date data type. all blocks are identified for scan .
Solution :- Add False Expression handling and False Filter Executor. it can be used to handle invalid Filter value.
This closes #1915
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e16e8781
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e16e8781
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e16e8781
Branch: refs/heads/master
Commit: e16e878189baa82bee5ca8af8d1229b7733b454a
Parents: fa6cd8d
Author: BJangir <ba...@gmail.com>
Authored: Fri Feb 2 16:33:45 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sun Feb 4 00:59:25 2018 +0530
----------------------------------------------------------------------
.../scan/filter/FilterExpressionProcessor.java | 3 +-
.../carbondata/core/scan/filter/FilterUtil.java | 3 +
.../filter/executer/FalseFilterExecutor.java | 60 ++++++++++++++++
.../scan/filter/intf/FilterExecuterType.java | 2 +-
.../FalseConditionalResolverImpl.java | 61 ++++++++++++++++
.../filterexpr/FilterProcessorTestCase.scala | 74 +++++++++++++++++++-
.../apache/spark/sql/CarbonBoundReference.scala | 4 ++
.../execution/CastExpressionOptimization.scala | 60 +++++++++++++---
.../strategy/CarbonLateDecodeStrategy.scala | 2 +
.../spark/sql/optimizer/CarbonFilters.scala | 4 ++
10 files changed, 259 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e16e8781/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
index 5a1b7df..3e23aa3 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
@@ -63,6 +63,7 @@ import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.scan.filter.resolver.LogicalFilterResolverImpl;
import org.apache.carbondata.core.scan.filter.resolver.RowLevelFilterResolverImpl;
import org.apache.carbondata.core.scan.filter.resolver.RowLevelRangeFilterResolverImpl;
+import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.FalseConditionalResolverImpl;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.TrueConditionalResolverImpl;
import org.apache.carbondata.core.scan.partition.PartitionUtil;
import org.apache.carbondata.core.scan.partition.Partitioner;
@@ -398,7 +399,7 @@ public class FilterExpressionProcessor implements FilterProcessor {
ConditionalExpression condExpression = null;
switch (filterExpressionType) {
case FALSE:
- return new RowLevelFilterResolverImpl(expression, false, false, tableIdentifier);
+ return new FalseConditionalResolverImpl(expression, false, false, tableIdentifier);
case TRUE:
return new TrueConditionalResolverImpl(expression, false, false, tableIdentifier);
case EQUALS:
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e16e8781/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index 3268ca3..a08edc0 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -74,6 +74,7 @@ import org.apache.carbondata.core.scan.filter.executer.AndFilterExecuterImpl;
import org.apache.carbondata.core.scan.filter.executer.DimColumnExecuterFilterInfo;
import org.apache.carbondata.core.scan.filter.executer.ExcludeColGroupFilterExecuterImpl;
import org.apache.carbondata.core.scan.filter.executer.ExcludeFilterExecuterImpl;
+import org.apache.carbondata.core.scan.filter.executer.FalseFilterExecutor;
import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
import org.apache.carbondata.core.scan.filter.executer.ImplicitIncludeFilterExecutorImpl;
import org.apache.carbondata.core.scan.filter.executer.IncludeColGroupFilterExecuterImpl;
@@ -176,6 +177,8 @@ public final class FilterUtil {
.getFilterRangeValues(segmentProperties), segmentProperties);
case TRUE:
return new TrueFilterExecutor();
+ case FALSE:
+ return new FalseFilterExecutor();
case ROWLEVEL:
default:
return new RowLevelFilterExecuterImpl(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e16e8781/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FalseFilterExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FalseFilterExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FalseFilterExecutor.java
new file mode 100644
index 0000000..2d2a15c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FalseFilterExecutor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.filter.executer;
+
+import java.io.IOException;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.BitSetGroup;
+
+/**
+ * API will apply filter based on resolver instance
+ *
+ * @return
+ * @throws FilterUnsupportedException
+ */
+public class FalseFilterExecutor implements FilterExecuter {
+
+ @Override
+ public BitSetGroup applyFilter(BlocksChunkHolder blocksChunkHolder, boolean useBitsetPipeline)
+ throws FilterUnsupportedException, IOException {
+ int numberOfPages = blocksChunkHolder.getDataBlock().numberOfPages();
+ BitSetGroup group = new BitSetGroup(numberOfPages);
+ for (int i = 0; i < numberOfPages; i++) {
+ BitSet set = new BitSet();
+ group.setBitSet(set, i);
+ }
+ return group;
+ }
+
+ @Override public boolean applyFilter(RowIntf value, int dimOrdinalMax)
+ throws FilterUnsupportedException, IOException {
+ return false;
+ }
+
+ @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
+
+ return new BitSet();
+ }
+
+ @Override public void readBlocks(BlocksChunkHolder blockChunkHolder) throws IOException {
+ // Do Nothing
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e16e8781/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/FilterExecuterType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/FilterExecuterType.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/FilterExecuterType.java
index 42defc6..d10b2e5 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/FilterExecuterType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/FilterExecuterType.java
@@ -21,6 +21,6 @@ import java.io.Serializable;
public enum FilterExecuterType implements Serializable {
INCLUDE, EXCLUDE, OR, AND, RESTRUCTURE, ROWLEVEL, RANGE, ROWLEVEL_GREATERTHAN,
- ROWLEVEL_GREATERTHAN_EQUALTO, ROWLEVEL_LESSTHAN_EQUALTO, ROWLEVEL_LESSTHAN, TRUE
+ ROWLEVEL_GREATERTHAN_EQUALTO, ROWLEVEL_LESSTHAN_EQUALTO, ROWLEVEL_LESSTHAN, TRUE, FALSE
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e16e8781/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/FalseConditionalResolverImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/FalseConditionalResolverImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/FalseConditionalResolverImpl.java
new file mode 100644
index 0000000..eccda1e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/FalseConditionalResolverImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.filter.resolver.resolverinfo;
+
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.TableProvider;
+import org.apache.carbondata.core.scan.filter.intf.FilterExecuterType;
+import org.apache.carbondata.core.scan.filter.resolver.ConditionalFilterResolverImpl;
+
+/* The expression with If FALSE will be resolved setting empty bitset. */
+public class FalseConditionalResolverImpl extends ConditionalFilterResolverImpl {
+
+ private static final long serialVersionUID = 4599541011924324041L;
+
+ public FalseConditionalResolverImpl(Expression exp, boolean isExpressionResolve,
+ boolean isIncludeFilter, AbsoluteTableIdentifier tableIdentifier) {
+ super(exp, isExpressionResolve, isIncludeFilter, tableIdentifier, false);
+ }
+
+ @Override public void resolve(AbsoluteTableIdentifier absoluteTableIdentifier,
+ TableProvider tableProvider) {
+ }
+
+ /**
+ * This method will provide the executer type to the callee inorder to identify
+ * the executer type for the filter resolution, False Expresssion willl not execute anything.
+ * it will return empty bitset
+ */
+ @Override public FilterExecuterType getFilterExecuterType() {
+ return FilterExecuterType.FALSE;
+ }
+
+ /**
+ * Method will the read filter expression corresponding to the resolver.
+ * This method is required in row level executer inorder to evaluate the filter
+ * expression against spark, as mentioned above row level is a special type
+ * filter resolver.
+ *
+ * @return Expression
+ */
+ public Expression getFilterExpresion() {
+ return exp;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e16e8781/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala
index d54906f..147756f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala
@@ -17,9 +17,9 @@
package org.apache.carbondata.spark.testsuite.filterexpr
-import java.sql.Timestamp
+import java.sql.{Date, Timestamp}
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{DataFrame, Row}
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -132,6 +132,11 @@ class FilterProcessorTestCase extends QueryTest with BeforeAndAfterAll {
sql(s"""LOAD DATA INPATH '$resourcesPath/big_int_Decimal.csv' INTO TABLE big_int_basicc_1 options ('DELIMITER'=',', 'QUOTECHAR'='\"', 'COMPLEX_DELIMITER_LEVEL_1'='$$','COMPLEX_DELIMITER_LEVEL_2'=':', 'FILEHEADER'= '')""")
sql(s"load data local inpath '$resourcesPath/big_int_Decimal.csv' into table big_int_basicc_Hive")
sql(s"load data local inpath '$resourcesPath/big_int_Decimal.csv' into table big_int_basicc_Hive_1")
+
+ sql("create table if not exists date_test(name String, age int, dob date,doj timestamp) stored by 'carbondata' ")
+ sql("insert into date_test select 'name1',12,'2014-01-01','2014-01-01 00:00:00' ")
+ sql("insert into date_test select 'name2',13,'2015-01-01','2015-01-01 00:00:00' ")
+ sql("insert into date_test select 'name3',14,'2016-01-01','2016-01-01 00:00:00' ")
}
test("Is not null filter") {
@@ -287,6 +292,70 @@ class FilterProcessorTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists outofrange")
}
+ test("check invalid date value") {
+ val df=sql("select * from date_test where dob=''")
+ assert(df.count()==0,"Wrong data are displayed on invalid date ")
+ }
+
+ test("check invalid date with and filter value ") {
+ val df=sql("select * from date_test where dob='' and age=13")
+ assert(df.count()==0,"Wrong data are displayed on invalid date ")
+ }
+
+ test("check invalid date with or filter value ") {
+ val df=sql("select * from date_test where dob='' or age=13")
+ checkAnswer(df,Seq(Row("name2",13,Date.valueOf("2015-01-01"),Timestamp.valueOf("2015-01-01 00:00:00.0"))))
+ }
+
+ test("check invalid date Geaterthan filter value ") {
+ val df=sql("select * from date_test where doj > '0' ")
+ checkAnswer(df,Seq(Row("name1",12,Date.valueOf("2014-01-01"),Timestamp.valueOf("2014-01-01 00:00:00.0")),
+ Row("name2",13,Date.valueOf("2015-01-01"),Timestamp.valueOf("2015-01-01 00:00:00.0")),
+ Row("name3",14,Date.valueOf("2016-01-01"),Timestamp.valueOf("2016-01-01 00:00:00.0"))))
+ }
+ test("check invalid date Geaterthan and lessthan filter value ") {
+ val df=sql("select * from date_test where doj > '0' and doj < '2015-01-01' ")
+ checkAnswer(df,Seq(Row("name1",12,Date.valueOf("2014-01-01"),Timestamp.valueOf("2014-01-01 00:00:00.0"))))
+ }
+ test("check invalid date Geaterthan or lessthan filter value ") {
+ val df=sql("select * from date_test where doj > '0' or doj < '2015-01-01' ")
+ checkAnswer(df,Seq(Row("name1",12,Date.valueOf("2014-01-01"),Timestamp.valueOf("2014-01-01 00:00:00.0")),
+ Row("name2",13,Date.valueOf("2015-01-01"),Timestamp.valueOf("2015-01-01 00:00:00.0")),
+ Row("name3",14,Date.valueOf("2016-01-01"),Timestamp.valueOf("2016-01-01 00:00:00.0"))))
+ }
+
+ test("check invalid timestamp value") {
+ val df=sql("select * from date_test where dob=''")
+ assert(df.count()==0,"Wrong data are displayed on invalid timestamp ")
+ }
+
+ test("check invalid timestamp with and filter value ") {
+ val df=sql("select * from date_test where doj='' and age=13")
+ assert(df.count()==0,"Wrong data are displayed on invalid timestamp ")
+ }
+
+ test("check invalid timestamp with or filter value ") {
+ val df=sql("select * from date_test where doj='' or age=13")
+ checkAnswer(df,Seq(Row("name2",13,Date.valueOf("2015-01-01"),Timestamp.valueOf("2015-01-01 00:00:00.0"))))
+ }
+
+ test("check invalid timestamp Geaterthan filter value ") {
+ val df=sql("select * from date_test where doj > '0' ")
+ checkAnswer(df,Seq(Row("name1",12,Date.valueOf("2014-01-01"),Timestamp.valueOf("2014-01-01 00:00:00.0")),
+ Row("name2",13,Date.valueOf("2015-01-01"),Timestamp.valueOf("2015-01-01 00:00:00.0")),
+ Row("name3",14,Date.valueOf("2016-01-01"),Timestamp.valueOf("2016-01-01 00:00:00.0"))))
+ }
+ test("check invalid timestamp Geaterthan and lessthan filter value ") {
+ val df=sql("select * from date_test where doj > '0' and doj < '2015-01-01 00:00:00' ")
+ checkAnswer(df,Seq(Row("name1",12,Date.valueOf("2014-01-01"),Timestamp.valueOf("2014-01-01 00:00:00.0"))))
+ }
+ test("check invalid timestamp Geaterthan or lessthan filter value ") {
+ val df=sql("select * from date_test where doj > '0' or doj < '2015-01-01 00:00:00' ")
+ checkAnswer(df,Seq(Row("name1",12,Date.valueOf("2014-01-01"),Timestamp.valueOf("2014-01-01 00:00:00.0")),
+ Row("name2",13,Date.valueOf("2015-01-01"),Timestamp.valueOf("2015-01-01 00:00:00.0")),
+ Row("name3",14,Date.valueOf("2016-01-01"),Timestamp.valueOf("2016-01-01 00:00:00.0"))))
+ }
+
test("like% test case with restructure") {
sql("drop table if exists like_filter")
sql(
@@ -319,6 +388,7 @@ class FilterProcessorTestCase extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS filtertestTablesWithNullJoin")
sql("drop table if exists like_filter")
CompactionSupportGlobalSortBigFileTest.deleteFile(file1)
+ sql("drop table if exists date_test")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e16e8781/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
index a043342..aa650e0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
@@ -29,6 +29,10 @@ case class CastExpr(expr: Expression) extends Filter {
override def references: Array[String] = null
}
+case class FalseExpr() extends Filter {
+ override def references: Array[String] = null
+}
+
case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean)
extends LeafExpression with NamedExpression with CodegenFallback {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e16e8781/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
index 2ff8c42..2de3fe6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
@@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.expressions.{Attribute, EmptyRow, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, LessThan, LessThanOrEqual, Literal, Not}
import org.apache.spark.sql.CastExpr
+import org.apache.spark.sql.FalseExpr
import org.apache.spark.sql.sources
import org.apache.spark.sql.types._
import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
@@ -48,7 +49,7 @@ object CastExpressionOptimization {
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
parser.setTimeZone(TimeZone.getTimeZone("GMT"))
} else {
- throw new UnsupportedOperationException ("Unsupported DataType being evaluated.")
+ throw new UnsupportedOperationException("Unsupported DataType being evaluated.")
}
try {
val value = parser.parse(v.toString).getTime() * 1000L
@@ -123,6 +124,7 @@ object CastExpressionOptimization {
tempList.asScala
}
}
+
/**
* This routines tries to apply rules on Cast Filter Predicates and if the rules applied and the
* values can be toss back to native datatypes the cast is removed. Current two rules are applied
@@ -238,7 +240,7 @@ object CastExpressionOptimization {
case c@GreaterThan(Cast(a: Attribute, _), Literal(v, t)) =>
a.dataType match {
case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) =>
- updateFilterForTimeStamp(v, c, ts)
+ updateFilterForNonEqualTimeStamp(v, c, updateFilterForTimeStamp(v, c, ts))
case i: IntegerType if t.sameType(DoubleType) =>
updateFilterForInt(v, c)
case s: ShortType if t.sameType(IntegerType) =>
@@ -248,7 +250,7 @@ object CastExpressionOptimization {
case c@GreaterThan(Literal(v, t), Cast(a: Attribute, _)) =>
a.dataType match {
case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) =>
- updateFilterForTimeStamp(v, c, ts)
+ updateFilterForNonEqualTimeStamp(v, c, updateFilterForTimeStamp(v, c, ts))
case i: IntegerType if t.sameType(DoubleType) =>
updateFilterForInt(v, c)
case s: ShortType if t.sameType(IntegerType) =>
@@ -258,7 +260,7 @@ object CastExpressionOptimization {
case c@LessThan(Cast(a: Attribute, _), Literal(v, t)) =>
a.dataType match {
case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) =>
- updateFilterForTimeStamp(v, c, ts)
+ updateFilterForNonEqualTimeStamp(v, c, updateFilterForTimeStamp(v, c, ts))
case i: IntegerType if t.sameType(DoubleType) =>
updateFilterForInt(v, c)
case s: ShortType if t.sameType(IntegerType) =>
@@ -268,7 +270,7 @@ object CastExpressionOptimization {
case c@LessThan(Literal(v, t), Cast(a: Attribute, _)) =>
a.dataType match {
case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) =>
- updateFilterForTimeStamp(v, c, ts)
+ updateFilterForNonEqualTimeStamp(v, c, updateFilterForTimeStamp(v, c, ts))
case i: IntegerType if t.sameType(DoubleType) =>
updateFilterForInt(v, c)
case s: ShortType if t.sameType(IntegerType) =>
@@ -278,7 +280,7 @@ object CastExpressionOptimization {
case c@GreaterThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
a.dataType match {
case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) =>
- updateFilterForTimeStamp(v, c, ts)
+ updateFilterForNonEqualTimeStamp(v, c, updateFilterForTimeStamp(v, c, ts))
case i: IntegerType if t.sameType(DoubleType) =>
updateFilterForInt(v, c)
case s: ShortType if t.sameType(IntegerType) =>
@@ -288,7 +290,7 @@ object CastExpressionOptimization {
case c@GreaterThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
a.dataType match {
case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) =>
- updateFilterForTimeStamp(v, c, ts)
+ updateFilterForNonEqualTimeStamp(v, c, updateFilterForTimeStamp(v, c, ts))
case i: IntegerType if t.sameType(DoubleType) =>
updateFilterForInt(v, c)
case s: ShortType if t.sameType(IntegerType) =>
@@ -298,7 +300,7 @@ object CastExpressionOptimization {
case c@LessThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
a.dataType match {
case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) =>
- updateFilterForTimeStamp(v, c, ts)
+ updateFilterForNonEqualTimeStamp(v, c, updateFilterForTimeStamp(v, c, ts))
case i: IntegerType if t.sameType(DoubleType) =>
updateFilterForInt(v, c)
case s: ShortType if t.sameType(IntegerType) =>
@@ -308,7 +310,7 @@ object CastExpressionOptimization {
case c@LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
a.dataType match {
case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) =>
- updateFilterForTimeStamp(v, c, ts)
+ updateFilterForNonEqualTimeStamp(v, c, updateFilterForTimeStamp(v, c, ts))
case i: IntegerType if t.sameType(DoubleType) =>
updateFilterForInt(v, c)
case s: ShortType if t.sameType(IntegerType) =>
@@ -320,6 +322,7 @@ object CastExpressionOptimization {
/**
* the method removes the cast for short type columns
+ *
* @param actualValue
* @param exp
* @return
@@ -350,6 +353,41 @@ object CastExpressionOptimization {
}
/**
+ *
+ * @param actualValue actual value of filter
+ * @param exp expression
+ * @param filter Filter Expression
+ * @return return CastExpression or same Filter
+ */
+ def updateFilterForNonEqualTimeStamp(actualValue: Any, exp: Expression, filter: Option[Filter]):
+ Option[sources.Filter] = {
+ filter.get match {
+ case FalseExpr() if (validTimeComparisionForSpark(actualValue)) =>
+ Some(CastExpr(exp))
+ case _ =>
+ filter
+ }
+ }
+
+ /**
+ * Spark compares data based on double also.
+ * Ex. slect * ...where time >0 , this will return all data
+ * So better give to Spark as Cast Expression.
+ *
+ * @param numericTimeValue
+ * @return if valid double return true,else false
+ */
+ def validTimeComparisionForSpark(numericTimeValue: Any): Boolean = {
+ try {
+ numericTimeValue.toString.toDouble
+ true
+ } catch {
+ case _ => false
+ }
+ }
+
+
+ /**
* the method removes the cast for timestamp type columns
*
* @param actualValue
@@ -362,10 +400,12 @@ object CastExpressionOptimization {
if (!newValue.equals(actualValue)) {
updateFilterBasedOnFilterType(exp, newValue)
} else {
- Some(CastExpr(exp))
+ Some(FalseExpr())
}
+
}
+
/**
* the method removes the cast for the respective filter type
*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e16e8781/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 4b1d11b..544c494 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -616,6 +616,8 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
Some(CarbonEndsWith(c))
case c@Contains(a: Attribute, Literal(v, t)) =>
Some(CarbonContainsWith(c))
+ case c@Literal(v, t) if (v == null) =>
+ Some(FalseExpr())
case others => None
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e16e8781/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 4d91375..c7767ce 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -135,6 +135,8 @@ object CarbonFilters {
}))
case CastExpr(expr: Expression) =>
Some(transformExpression(expr))
+ case FalseExpr() =>
+ Some(new FalseExpression(null))
case _ => None
}
}
@@ -269,6 +271,8 @@ object CarbonFilters {
Some(CarbonContainsWith(c))
case c@Cast(a: Attribute, _) =>
Some(CastExpr(c))
+ case c@Literal(v, t) if v == null =>
+ Some(FalseExpr())
case others =>
if (!or) {
others.collect {