You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/29 06:29:33 UTC

[carbondata] branch master updated: [CARBONDATA-3397]Remove SparkUnknown Expression to Index Server

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 15bae6e  [CARBONDATA-3397]Remove SparkUnknown Expression to Index Server
15bae6e is described below

commit 15bae6e5848bc83d4a6f65499fe7dacf88f5a67a
Author: BJangir <ba...@gmail.com>
AuthorDate: Mon May 27 14:55:39 2019 +0530

    [CARBONDATA-3397]Remove SparkUnknown Expression to Index Server
    
    Problem
    if Query has UDF and it is registered to the Main driver Since UDF function will not be available in Index server , query will be failed in Indexserver (with NoClassDefincationFound).
    
    Solution
    UDF are SparkUnkownFilter(RowLevelFilterExecuterImpl) so Remove the SparkUnknown Expression because anyway for pruning we select all blocks. org.apache.carbondata.core.scan.filter.executer.RowLevelFilterExecuterImpl#isScanRequired.
    
    Supply all the UDFs functions and it's related lambda expressions to IndexServer also. But it has below issues
    a. Spark FunctionRegistry is not writable
    b. sending All functions from Main Server to Index server will be costly(in Size) & no way to find implicit function and explicit user created functions.
    
    So Solution 1 is adopted.
    
    This closes #3238
---
 .../core/datamap/DistributableDataMapFormat.java   |  8 ++++
 .../scan/filter/FilterExpressionProcessor.java     | 43 ++++++++++++++++++++++
 .../carbondata/indexserver/DataMapJobs.scala       | 39 ++++++++++++++++++++
 3 files changed, 90 insertions(+)

diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
index f76cfec..57540e4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
@@ -334,4 +334,12 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
   public boolean isJobToClearDataMaps() {
     return isJobToClearDataMaps;
   }
+
+  public FilterResolverIntf getFilterResolverIntf() {
+    return filterResolverIntf;
+  }
+
+  public void setFilterResolverIntf(FilterResolverIntf filterResolverIntf) {
+    this.filterResolverIntf = filterResolverIntf;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
index fd75496..493e7e7 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
@@ -42,6 +42,8 @@ import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
 import org.apache.carbondata.core.scan.expression.conditional.StartsWithExpression;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+import org.apache.carbondata.core.scan.expression.logical.TrueExpression;
 import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
@@ -487,4 +489,45 @@ public class FilterExpressionProcessor implements FilterProcessor {
       return !bitSet.isEmpty();
     }
   }
+
+  /**
+   * Remove UnknownExpression and change to TrueExpression
+   *
+   * @param expressionTree
+   * @return expressionTree without UnknownExpression
+   */
+  public Expression removeUnknownExpression(Expression expressionTree) {
+    ExpressionType filterExpressionType = expressionTree.getFilterExpressionType();
+    BinaryExpression currentExpression = null;
+    switch (filterExpressionType) {
+      case OR:
+        currentExpression = (BinaryExpression) expressionTree;
+        return new OrExpression(
+                removeUnknownExpression(currentExpression.getLeft()),
+                removeUnknownExpression(currentExpression.getRight())
+        );
+      case AND:
+        currentExpression = (BinaryExpression) expressionTree;
+        return new AndExpression(
+                removeUnknownExpression(currentExpression.getLeft()),
+                removeUnknownExpression(currentExpression.getRight())
+        );
+      case UNKNOWN:
+        return new TrueExpression(null);
+      default:
+        return expressionTree;
+    }
+  }
+
+  /**
+   * Change UnknownReslover to TrueExpression Reslover.
+   *
+   * @param tableIdentifier
+   * @return
+   */
+  public FilterResolverIntf changeUnknownResloverToTrue(AbsoluteTableIdentifier tableIdentifier) {
+    return getFilterResolverBasedOnExpressionType(ExpressionType.TRUE, false,
+        new TrueExpression(null), tableIdentifier, new TrueExpression(null));
+
+  }
 }
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
index 57bdf34..0ee4ebb 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
@@ -26,6 +26,11 @@ import org.apache.spark.util.SizeEstimator
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datamap.{AbstractDataMapJob, DistributableDataMapFormat}
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.scan.expression.BinaryExpression
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.scan.filter.resolver.{FilterResolverIntf, LogicalFilterResolverImpl, RowLevelFilterResolverImpl}
 import org.apache.carbondata.spark.util.CarbonScalaUtil.logTime
 
 /**
@@ -42,11 +47,45 @@ class DistributedDataMapJob extends AbstractDataMapJob {
       LOGGER.debug(s"Size of message sent to Index Server: $messageSize")
     }
     val (resonse, time) = logTime {
+      var filterInf = dataMapFormat.getFilterResolverIntf
+      val filterProcessor = new FilterExpressionProcessor
+      filterInf = removeSparkUnknown(filterInf,
+        dataMapFormat.getCarbonTable.getAbsoluteTableIdentifier, filterProcessor)
+      dataMapFormat.setFilterResolverIntf(filterInf)
       IndexServer.getClient.getSplits(dataMapFormat).toList.asJava
     }
     LOGGER.info(s"Time taken to get response from server: $time ms")
     resonse
   }
+
+  /**
+   * Iterate over FiltersReslover,
+   *   a. Change only RowLevelFilterResolverImpl because SparkUnkown is part of it
+   * and others FilterReslover like ConditionalFilterResolverImpl so directly return.
+   *     b. Change SparkUnkownExpression to TrueExpression so that isScanRequired
+   * selects block/blocklet.
+   *
+   * @param filterInf       FiltersReslover to be changed
+   * @param tableIdentifer  AbsoluteTableIdentifier object
+   * @param filterProcessor changed FiltersReslover.
+   * @return
+   */
+  def removeSparkUnknown(filterInf: FilterResolverIntf,
+      tableIdentifer: AbsoluteTableIdentifier,
+                         filterProcessor: FilterExpressionProcessor): FilterResolverIntf = {
+    if (filterInf.isInstanceOf[LogicalFilterResolverImpl]) {
+      return new LogicalFilterResolverImpl(
+        removeSparkUnknown(filterInf.getLeft, tableIdentifer, filterProcessor),
+        removeSparkUnknown(filterInf.getRight, tableIdentifer, filterProcessor),
+        filterProcessor.removeUnknownExpression(filterInf.getFilterExpression).
+          asInstanceOf[BinaryExpression])
+    }
+    if (filterInf.isInstanceOf[RowLevelFilterResolverImpl] &&
+      filterInf.getFilterExecuterType == ExpressionType.UNKNOWN) {
+      return filterProcessor.changeUnknownResloverToTrue(tableIdentifer)
+    }
+    return filterInf;
+  }
 }
 
 /**