You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:31 UTC
[33/47] incubator-carbondata git commit: [CARBONDATA-112]
Regexp_replace function is throwing NullPointerException (#867)
[CARBONDATA-112] Regexp_replace function is throwing NullPointerException (#867)
as the expression is evaluated on executor side in carbon, transient variable was not initialized.
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/935e0d3a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/935e0d3a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/935e0d3a
Branch: refs/heads/master
Commit: 935e0d3a6dad7300adc980df6bfacc1272dff904
Parents: bf7f9f7
Author: nareshpr <pr...@gmail.com>
Authored: Fri Jul 29 00:21:39 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Fri Jul 29 00:21:39 2016 +0530
----------------------------------------------------------------------
.../filters/measurefilter/util/FilterUtil.java | 32 ++++++++++++++++++++
.../org/apache/spark/sql/CarbonOperators.scala | 8 ++++-
.../spark/sql/SparkUnknownExpression.scala | 20 +++++++++---
.../carbondata/spark/rdd/CarbonQueryRDD.scala | 17 +++++++++--
...estampDataTypeDirectDictionaryTestCase.scala | 17 +++++++++--
.../filterexpr/AllDataTypesTestCaseFilter.scala | 14 ++++++++-
6 files changed, 97 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/935e0d3a/core/src/main/java/org/carbondata/query/filters/measurefilter/util/FilterUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/filters/measurefilter/util/FilterUtil.java b/core/src/main/java/org/carbondata/query/filters/measurefilter/util/FilterUtil.java
index fb02b74..5ac89c6 100644
--- a/core/src/main/java/org/carbondata/query/filters/measurefilter/util/FilterUtil.java
+++ b/core/src/main/java/org/carbondata/query/filters/measurefilter/util/FilterUtil.java
@@ -71,6 +71,7 @@ import org.carbondata.query.expression.ColumnExpression;
import org.carbondata.query.expression.Expression;
import org.carbondata.query.expression.ExpressionResult;
import org.carbondata.query.expression.LiteralExpression;
+import org.carbondata.query.expression.UnknownExpression;
import org.carbondata.query.expression.conditional.ListExpression;
import org.carbondata.query.expression.exception.FilterIllegalMemberException;
import org.carbondata.query.expression.exception.FilterUnsupportedException;
@@ -1351,4 +1352,35 @@ public final class FilterUtil {
LOGGER.error(e, CarbonCommonConstants.FILTER_INVALID_MEMBER + e.getMessage());
}
}
+
+ /**
+ * This method will return list of all the unknown expressions
+ *
+ * @param expression
+ */
+ public static List<UnknownExpression> getUnknownExpressionsList(Expression expression) {
+ List<UnknownExpression> listOfExp =
+ new ArrayList<UnknownExpression>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ getUnknownExpressionsList(expression, listOfExp);
+ return listOfExp;
+ }
+
+ /**
+ * This method will prepare the list with all unknown expressions
+ *
+ * @param expression
+ * @param lst
+ */
+ private static void getUnknownExpressionsList(Expression expression,
+ List<UnknownExpression> lst) {
+ if (expression instanceof UnknownExpression) {
+ UnknownExpression colExp = (UnknownExpression) expression;
+ lst.add(colExp);
+ return;
+ }
+ for (Expression child : expression.getChildren()) {
+ getUnknownExpressionsList(child, lst);
+ }
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/935e0d3a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
index 76b7e8c..e4e7ea4 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
@@ -570,10 +570,16 @@ case class CarbonTableScan(
conf,
cubeCreationTime,
schemaLastUpdatedTime,
- carbonCatalog.storePath)
+ carbonCatalog.storePath,
+ this)
big
}
+ def newProjection(expression: org.apache.spark.sql.catalyst.expressions.Expression):
+ InternalRow => Any = {
+ super.newProjection(Seq(expression), output)
+ }
+
def doExecute(): RDD[InternalRow] = {
def toType(obj: Any): Any = {
obj match {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/935e0d3a/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
index 058cea5..1162db4 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
@@ -21,6 +21,7 @@ import java.util.{ArrayList, List}
import scala.collection.JavaConverters._
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression, GenericMutableRow}
import org.carbondata.core.carbon.metadata.encoder.Encoding
@@ -30,9 +31,11 @@ import org.carbondata.query.expression.conditional.ConditionalExpression
import org.carbondata.query.expression.exception.FilterUnsupportedException
import org.carbondata.spark.util.CarbonScalaUtil
-class SparkUnknownExpression(sparkExp: SparkExpression)
+class SparkUnknownExpression(var sparkExp: SparkExpression)
extends UnknownExpression with ConditionalExpression {
+ private var evaluateExpression: (InternalRow) => Any = sparkExp.eval
+ private var isExecutor: Boolean = false
children.addAll(getColumnList())
override def evaluate(carbonRowInstance: RowIntf): ExpressionResult = {
@@ -49,10 +52,13 @@ class SparkUnknownExpression(sparkExp: SparkExpression)
}
}
try {
- val sparkRes = sparkExp.eval(
- new GenericMutableRow(values.map(a => a.asInstanceOf[Any]).toArray)
- )
-
+ val result = evaluateExpression(
+ new GenericMutableRow(values.map(a => a.asInstanceOf[Any]).toArray))
+ val sparkRes = if (isExecutor) {
+ result.asInstanceOf[InternalRow].get(0, sparkExp.dataType)
+ } else {
+ result
+ }
new ExpressionResult(CarbonScalaUtil.convertSparkToCarbonDataType(sparkExp.dataType),
sparkRes
)
@@ -70,6 +76,10 @@ class SparkUnknownExpression(sparkExp: SparkExpression)
sparkExp.toString()
}
+ def setEvaluateExpression(evaluateExpression: (InternalRow) => Any): Unit = {
+ this.evaluateExpression = evaluateExpression
+ isExecutor = true
+ }
def getColumnList(): java.util.List[ColumnExpression] = {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/935e0d3a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
index 97f1993..50d3ca7 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
@@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.CarbonTableScan
+import org.apache.spark.sql.SparkUnknownExpression
import org.apache.spark.sql.hive.DistributionUtil
import org.carbondata.common.logging.LogServiceFactory
@@ -39,11 +41,11 @@ import org.carbondata.query.carbon.executor.QueryExecutorFactory
import org.carbondata.query.carbon.model.QueryModel
import org.carbondata.query.carbon.result.RowResult
import org.carbondata.query.expression.Expression
+import org.carbondata.query.filters.measurefilter.util.FilterUtil
import org.carbondata.spark.Value
import org.carbondata.spark.load.CarbonLoaderUtil
import org.carbondata.spark.util.QueryPlanUtil
-
class CarbonSparkPartition(rddId: Int, val idx: Int,
val locations: Array[String],
val tableBlockInfos: util.List[TableBlockInfo])
@@ -69,7 +71,8 @@ class CarbonQueryRDD[V: ClassTag](
@transient conf: Configuration,
cubeCreationTime: Long,
schemaLastUpdatedTime: Long,
- baseStoreLocation: String)
+ baseStoreLocation: String,
+ sparkPlan: CarbonTableScan = null)
extends RDD[V](sc, Nil) with Logging {
val defaultParallelism = sc.defaultParallelism
@@ -169,6 +172,16 @@ class CarbonQueryRDD[V: ClassTag](
override def compute(thepartition: Partition, context: TaskContext): Iterator[V] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ val filterResolverTree = this.queryModel.getFilterExpressionResolverTree
+ if (null != filterResolverTree && null != sparkPlan) {
+ FilterUtil.getUnknownExpressionsList(filterResolverTree.getFilterExpression).
+ asScala.foreach(unknownExpression => {
+ val unKnownSparkExpression = unknownExpression.
+ asInstanceOf[org.apache.spark.sql.SparkUnknownExpression]
+ unKnownSparkExpression.setEvaluateExpression(
+ sparkPlan.newProjection(unKnownSparkExpression.sparkExp))
+ })
+ }
val iter = new Iterator[V] {
var rowIterator: CarbonIterator[Array[Object]] = _
var queryStartTime: Long = 0
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/935e0d3a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala
index f325254..db67c86 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala
@@ -113,12 +113,25 @@ class TimestampDataTypeDirectDictionaryTest extends QueryTest with BeforeAndAfte
}
- test("select doj from directDictionaryCube with greater than filter") {
+ test("select doj from directDictionaryCube with regexp_replace equals filter") {
+ checkAnswer(
+ sql("select doj from directDictionaryCube where regexp_replace(doj, '-', '/') = '2016/03/14 15:00:09'"),
+ Seq(Row(Timestamp.valueOf("2016-03-14 15:00:09")))
+ )
+ }
+
+ test("select doj from directDictionaryCube with regexp_replace NOT IN filter") {
+ checkAnswer(
+ sql("select doj from directDictionaryCube where regexp_replace(doj, '-', '/') NOT IN ('2016/03/14 15:00:09')"),
+ Seq(Row(Timestamp.valueOf("2016-04-14 15:00:09")), Row(null))
+ )
+ }
+
+ test("select doj from directDictionaryCube with greater than filter") {
checkAnswer(
sql("select doj from directDictionaryCube where doj>'2016-03-14 15:00:09'"),
Seq(Row(Timestamp.valueOf("2016-04-14 15:00:09")))
)
-
}
test("select count(doj) from directDictionaryCube") {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/935e0d3a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
index d9abe75..7cb6dfd 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
@@ -46,7 +46,19 @@ class AllDataTypesTestCaseFilter extends QueryTest with BeforeAndAfterAll {
sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypescubeFilter where empname in ('arvind','ayushi') group by empno,empname,utilization"),
sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypescubeFilter_hive where empname in ('arvind','ayushi') group by empno,empname,utilization"))
}
-
+
+ test("select empno,empname from alldatatypescubeFilter where regexp_replace(workgroupcategoryname, 'er', 'ment') NOT IN ('development')") {
+ checkAnswer(
+ sql("select empno,empname from alldatatypescubeFilter where regexp_replace(workgroupcategoryname, 'er', 'ment') NOT IN ('development')"),
+ sql("select empno,empname from alldatatypescubeFilter_hive where regexp_replace(workgroupcategoryname, 'er', 'ment') NOT IN ('development')"))
+ }
+
+ test("select empno,empname from alldatatypescubeFilter where regexp_replace(workgroupcategoryname, 'er', 'ment') != 'development'") {
+ checkAnswer(
+ sql("select empno,empname from alldatatypescubeFilter where regexp_replace(workgroupcategoryname, 'er', 'ment') != 'development'"),
+ sql("select empno,empname from alldatatypescubeFilter_hive where regexp_replace(workgroupcategoryname, 'er', 'ment') != 'development'"))
+ }
+
override def afterAll {
sql("drop table alldatatypescubeFilter")
sql("drop table alldatatypescubeFilter_hive")