You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:31 UTC

[33/47] incubator-carbondata git commit: [CARBONDATA-112] Regexp_replace function is throwing NullPointerException (#867)

[CARBONDATA-112] Regexp_replace function is throwing NullPointerException (#867)

as the expression is evaluated on executor side in carbon, transient variable was not initialized.

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/935e0d3a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/935e0d3a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/935e0d3a

Branch: refs/heads/master
Commit: 935e0d3a6dad7300adc980df6bfacc1272dff904
Parents: bf7f9f7
Author: nareshpr <pr...@gmail.com>
Authored: Fri Jul 29 00:21:39 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Fri Jul 29 00:21:39 2016 +0530

----------------------------------------------------------------------
 .../filters/measurefilter/util/FilterUtil.java  | 32 ++++++++++++++++++++
 .../org/apache/spark/sql/CarbonOperators.scala  |  8 ++++-
 .../spark/sql/SparkUnknownExpression.scala      | 20 +++++++++---
 .../carbondata/spark/rdd/CarbonQueryRDD.scala   | 17 +++++++++--
 ...estampDataTypeDirectDictionaryTestCase.scala | 17 +++++++++--
 .../filterexpr/AllDataTypesTestCaseFilter.scala | 14 ++++++++-
 6 files changed, 97 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/935e0d3a/core/src/main/java/org/carbondata/query/filters/measurefilter/util/FilterUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/filters/measurefilter/util/FilterUtil.java b/core/src/main/java/org/carbondata/query/filters/measurefilter/util/FilterUtil.java
index fb02b74..5ac89c6 100644
--- a/core/src/main/java/org/carbondata/query/filters/measurefilter/util/FilterUtil.java
+++ b/core/src/main/java/org/carbondata/query/filters/measurefilter/util/FilterUtil.java
@@ -71,6 +71,7 @@ import org.carbondata.query.expression.ColumnExpression;
 import org.carbondata.query.expression.Expression;
 import org.carbondata.query.expression.ExpressionResult;
 import org.carbondata.query.expression.LiteralExpression;
+import org.carbondata.query.expression.UnknownExpression;
 import org.carbondata.query.expression.conditional.ListExpression;
 import org.carbondata.query.expression.exception.FilterIllegalMemberException;
 import org.carbondata.query.expression.exception.FilterUnsupportedException;
@@ -1351,4 +1352,35 @@ public final class FilterUtil {
       LOGGER.error(e, CarbonCommonConstants.FILTER_INVALID_MEMBER + e.getMessage());
     }
   }
+
+  /**
+   * This method will return list of all the unknown expressions
+   *
+   * @param expression
+   */
+  public static List<UnknownExpression> getUnknownExpressionsList(Expression expression) {
+    List<UnknownExpression> listOfExp =
+        new ArrayList<UnknownExpression>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    getUnknownExpressionsList(expression, listOfExp);
+    return listOfExp;
+  }
+
+  /**
+   * This method will prepare the list with all unknown expressions
+   *
+   * @param expression
+   * @param lst
+   */
+  private static void getUnknownExpressionsList(Expression expression,
+      List<UnknownExpression> lst) {
+    if (expression instanceof UnknownExpression) {
+      UnknownExpression colExp = (UnknownExpression) expression;
+      lst.add(colExp);
+      return;
+    }
+    for (Expression child : expression.getChildren()) {
+      getUnknownExpressionsList(child, lst);
+    }
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/935e0d3a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
index 76b7e8c..e4e7ea4 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
@@ -570,10 +570,16 @@ case class CarbonTableScan(
       conf,
       cubeCreationTime,
       schemaLastUpdatedTime,
-      carbonCatalog.storePath)
+      carbonCatalog.storePath,
+      this)
     big
   }
 
+  def newProjection(expression: org.apache.spark.sql.catalyst.expressions.Expression):
+      InternalRow => Any = {
+    super.newProjection(Seq(expression), output)
+  }
+
   def doExecute(): RDD[InternalRow] = {
     def toType(obj: Any): Any = {
       obj match {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/935e0d3a/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
index 058cea5..1162db4 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
@@ -21,6 +21,7 @@ import java.util.{ArrayList, List}
 
 import scala.collection.JavaConverters._
 
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression, GenericMutableRow}
 
 import org.carbondata.core.carbon.metadata.encoder.Encoding
@@ -30,9 +31,11 @@ import org.carbondata.query.expression.conditional.ConditionalExpression
 import org.carbondata.query.expression.exception.FilterUnsupportedException
 import org.carbondata.spark.util.CarbonScalaUtil
 
-class SparkUnknownExpression(sparkExp: SparkExpression)
+class SparkUnknownExpression(var sparkExp: SparkExpression)
   extends UnknownExpression with ConditionalExpression {
 
+  private var evaluateExpression: (InternalRow) => Any = sparkExp.eval
+  private var isExecutor: Boolean = false
   children.addAll(getColumnList())
 
   override def evaluate(carbonRowInstance: RowIntf): ExpressionResult = {
@@ -49,10 +52,13 @@ class SparkUnknownExpression(sparkExp: SparkExpression)
       }
     }
     try {
-      val sparkRes = sparkExp.eval(
-        new GenericMutableRow(values.map(a => a.asInstanceOf[Any]).toArray)
-      )
-
+      val result = evaluateExpression(
+          new GenericMutableRow(values.map(a => a.asInstanceOf[Any]).toArray))
+      val sparkRes = if (isExecutor) {
+        result.asInstanceOf[InternalRow].get(0, sparkExp.dataType)
+      } else {
+        result
+      }
       new ExpressionResult(CarbonScalaUtil.convertSparkToCarbonDataType(sparkExp.dataType),
         sparkRes
       )
@@ -70,6 +76,10 @@ class SparkUnknownExpression(sparkExp: SparkExpression)
     sparkExp.toString()
   }
 
+  def setEvaluateExpression(evaluateExpression: (InternalRow) => Any): Unit = {
+    this.evaluateExpression = evaluateExpression
+    isExecutor = true
+  }
 
   def getColumnList(): java.util.List[ColumnExpression] = {
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/935e0d3a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
index 97f1993..50d3ca7 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
@@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.Job
 import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.CarbonTableScan
+import org.apache.spark.sql.SparkUnknownExpression
 import org.apache.spark.sql.hive.DistributionUtil
 
 import org.carbondata.common.logging.LogServiceFactory
@@ -39,11 +41,11 @@ import org.carbondata.query.carbon.executor.QueryExecutorFactory
 import org.carbondata.query.carbon.model.QueryModel
 import org.carbondata.query.carbon.result.RowResult
 import org.carbondata.query.expression.Expression
+import org.carbondata.query.filters.measurefilter.util.FilterUtil
 import org.carbondata.spark.Value
 import org.carbondata.spark.load.CarbonLoaderUtil
 import org.carbondata.spark.util.QueryPlanUtil
 
-
 class CarbonSparkPartition(rddId: Int, val idx: Int,
   val locations: Array[String],
   val tableBlockInfos: util.List[TableBlockInfo])
@@ -69,7 +71,8 @@ class CarbonQueryRDD[V: ClassTag](
     @transient conf: Configuration,
     cubeCreationTime: Long,
     schemaLastUpdatedTime: Long,
-    baseStoreLocation: String)
+    baseStoreLocation: String,
+    sparkPlan: CarbonTableScan = null)
   extends RDD[V](sc, Nil) with Logging {
 
   val defaultParallelism = sc.defaultParallelism
@@ -169,6 +172,16 @@ class CarbonQueryRDD[V: ClassTag](
 
   override def compute(thepartition: Partition, context: TaskContext): Iterator[V] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    val filterResolverTree = this.queryModel.getFilterExpressionResolverTree
+    if (null != filterResolverTree && null != sparkPlan) {
+      FilterUtil.getUnknownExpressionsList(filterResolverTree.getFilterExpression).
+          asScala.foreach(unknownExpression => {
+        val unKnownSparkExpression = unknownExpression.
+            asInstanceOf[org.apache.spark.sql.SparkUnknownExpression]
+        unKnownSparkExpression.setEvaluateExpression(
+            sparkPlan.newProjection(unKnownSparkExpression.sparkExp))
+      })
+    }
     val iter = new Iterator[V] {
       var rowIterator: CarbonIterator[Array[Object]] = _
       var queryStartTime: Long = 0

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/935e0d3a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala
index f325254..db67c86 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala
@@ -113,12 +113,25 @@ class TimestampDataTypeDirectDictionaryTest extends QueryTest with BeforeAndAfte
 
   }
   
-    test("select doj from directDictionaryCube with greater than filter") {
+  test("select doj from directDictionaryCube with regexp_replace equals filter") {
+    checkAnswer(
+      sql("select doj from directDictionaryCube where regexp_replace(doj, '-', '/') = '2016/03/14 15:00:09'"),
+      Seq(Row(Timestamp.valueOf("2016-03-14 15:00:09")))
+    )
+  }
+  
+  test("select doj from directDictionaryCube with regexp_replace NOT IN filter") {
+    checkAnswer(
+      sql("select doj from directDictionaryCube where regexp_replace(doj, '-', '/') NOT IN ('2016/03/14 15:00:09')"),
+      Seq(Row(Timestamp.valueOf("2016-04-14 15:00:09")), Row(null))
+    )
+  }
+  
+  test("select doj from directDictionaryCube with greater than filter") {
     checkAnswer(
       sql("select doj from directDictionaryCube where doj>'2016-03-14 15:00:09'"),
       Seq(Row(Timestamp.valueOf("2016-04-14 15:00:09")))
     )
-
   }
 
   test("select count(doj) from directDictionaryCube") {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/935e0d3a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
index d9abe75..7cb6dfd 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
@@ -46,7 +46,19 @@ class AllDataTypesTestCaseFilter extends QueryTest with BeforeAndAfterAll {
       sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypescubeFilter where empname in ('arvind','ayushi') group by empno,empname,utilization"),
       sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypescubeFilter_hive where empname in ('arvind','ayushi') group by empno,empname,utilization"))
   }
-
+  
+  test("select empno,empname from alldatatypescubeFilter where regexp_replace(workgroupcategoryname, 'er', 'ment') NOT IN ('development')") {
+    checkAnswer(
+      sql("select empno,empname from alldatatypescubeFilter where regexp_replace(workgroupcategoryname, 'er', 'ment') NOT IN ('development')"),
+      sql("select empno,empname from alldatatypescubeFilter_hive where regexp_replace(workgroupcategoryname, 'er', 'ment') NOT IN ('development')"))
+  }
+  
+  test("select empno,empname from alldatatypescubeFilter where regexp_replace(workgroupcategoryname, 'er', 'ment') != 'development'") {
+    checkAnswer(
+      sql("select empno,empname from alldatatypescubeFilter where regexp_replace(workgroupcategoryname, 'er', 'ment') != 'development'"),
+      sql("select empno,empname from alldatatypescubeFilter_hive where regexp_replace(workgroupcategoryname, 'er', 'ment') != 'development'"))
+  }
+  
   override def afterAll {
     sql("drop table alldatatypescubeFilter")
     sql("drop table alldatatypescubeFilter_hive")