You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/09 07:58:06 UTC

[04/35] carbondata git commit: [CARBONDATA-2134] Prevent implicit column filter list from getting serialized while submitting task to executor

[CARBONDATA-2134] Prevent implicit column filter list from getting serialized while submitting task to executor

Problem
In the current store blocklet pruning in driver and no further pruning takes place in the executor side. But still the implicit column filter list being sent to executor. As the size of list grows the cost of serializing and deserializing the list is increasing which can impact the query performance.

Solution
Remove the list from the filter expression before submitting the task to executor.

This closes #1935


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/11a795ce
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/11a795ce
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/11a795ce

Branch: refs/heads/carbonstore-rebase
Commit: 11a795ceca80e74de8264cba3571ca78ce03fae4
Parents: 7c6c42f
Author: m00258959 <ma...@huawei.com>
Authored: Mon Feb 5 17:10:18 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Feb 9 13:03:05 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/scan/filter/FilterUtil.java | 35 ++++++++++++-
 .../core/scan/filter/FilterUtilTest.java        | 48 ++++++++++++++++++
 .../org/apache/carbondata/spark/util/Util.java  | 19 +++++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 52 +++++++++++++++++++-
 4 files changed, 152 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/11a795ce/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index a08edc0..689da9f 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -67,9 +67,12 @@ import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.ExpressionResult;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.InExpression;
 import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
 import org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.TrueExpression;
 import org.apache.carbondata.core.scan.filter.executer.AndFilterExecuterImpl;
 import org.apache.carbondata.core.scan.filter.executer.DimColumnExecuterFilterInfo;
 import org.apache.carbondata.core.scan.filter.executer.ExcludeColGroupFilterExecuterImpl;
@@ -1824,4 +1827,34 @@ public final class FilterUtil {
     }
     return columnFilterInfo;
   }
-}
\ No newline at end of file
+
+  /**
+   * This method will check for ColumnExpression with column name positionID and if found will
+   * replace the InExpression with true expression. This is done to stop serialization of List
+   * expression which is right children of InExpression as it can impact the query performance
+   * as the size of list grows bigger.
+   *
+   * @param expression
+   */
+  public static void removeInExpressionNodeWithPositionIdColumn(Expression expression) {
+    ExpressionType filterExpressionType = expression.getFilterExpressionType();
+    if (ExpressionType.AND == filterExpressionType) {
+      Expression rightExpression = ((AndExpression) expression).getRight();
+      if (rightExpression instanceof InExpression) {
+        List<Expression> children = rightExpression.getChildren();
+        if (null != children && !children.isEmpty()) {
+          Expression childExpression = children.get(0);
+          // check for the positionId as the column name in ColumnExpression
+          if (childExpression instanceof ColumnExpression && ((ColumnExpression) childExpression)
+              .getColumnName().equalsIgnoreCase(CarbonCommonConstants.POSITION_ID)) {
+            // Remove the right expression node and point the expression to left node expression
+            expression
+                .findAndSetChild(((AndExpression) expression).getRight(), new TrueExpression(null));
+            LOGGER.info("In expression removed from the filter expression list to prevent it from"
+                + " serializing on executor");
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11a795ce/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java b/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java
index 89b3122..565da04 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java
@@ -35,8 +35,11 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.InExpression;
 import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.TrueExpression;
 import org.apache.carbondata.core.scan.filter.intf.RowImpl;
 import org.apache.carbondata.core.util.BitSetGroup;
 
@@ -399,4 +402,49 @@ public class FilterUtilTest extends AbstractDictionaryCacheTest {
         FilterUtil.createBitSetGroupWithDefaultValue(15, 448200, true);
     assertTrue(bitSetGroupWithDefaultValue.getNumberOfPages() == 15);
   }
+
+  @Test public void testRemoveInExpressionNodeWithPositionIdColumn() {
+    List<Expression> children = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    // create literal expression
+    LiteralExpression literalExpression =
+        new LiteralExpression("0/1/0-0_batchno0-0-1517808273200/0", DataTypes.STRING);
+    children.add(literalExpression);
+    // create list expression
+    ListExpression listExpression = new ListExpression(children);
+    // create column expression with column name as positionId
+    ColumnExpression columnExpression =
+        new ColumnExpression(CarbonCommonConstants.POSITION_ID, DataTypes.STRING);
+    // create InExpression as right node
+    InExpression inExpression = new InExpression(columnExpression, listExpression);
+    // create a dummy true expression as left node
+    TrueExpression trueExpression = new TrueExpression(null);
+    // create and expression as the root node
+    Expression expression = new AndExpression(trueExpression, inExpression);
+    // test remove expression method
+    FilterUtil.removeInExpressionNodeWithPositionIdColumn(expression);
+    // after removing the right node instance of right node should be of true expression
+    assert (((AndExpression) expression).getRight() instanceof TrueExpression);
+  }
+
+  @Test public void testRemoveInExpressionNodeWithDifferentColumn() {
+    List<Expression> children = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    // create literal expression
+    LiteralExpression literalExpression =
+        new LiteralExpression("testName", DataTypes.STRING);
+    children.add(literalExpression);
+    // create list expression
+    ListExpression listExpression = new ListExpression(children);
+    // create column expression with column name as positionId
+    ColumnExpression columnExpression = new ColumnExpression("name", DataTypes.STRING);
+    // create InExpression as right node
+    InExpression inExpression = new InExpression(columnExpression, listExpression);
+    // create a dummy true expression as left node
+    TrueExpression trueExpression = new TrueExpression(null);
+    // create and expression as the root node
+    Expression expression = new AndExpression(trueExpression, inExpression);
+    // test remove expression method
+    FilterUtil.removeInExpressionNodeWithPositionIdColumn(expression);
+    // after removing the right node instance of right node should be of true expression
+    assert (((AndExpression) expression).getRight() instanceof InExpression);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11a795ce/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
index 8c14cd3..cd2b81c 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
@@ -17,6 +17,10 @@
 
 package org.apache.carbondata.spark.util;
 
+import java.util.List;
+
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+
 import org.apache.spark.SparkConf;
 import org.apache.spark.util.Utils;
 
@@ -27,4 +31,19 @@ public class Util {
   public static String[] getConfiguredLocalDirs(SparkConf conf) {
     return Utils.getConfiguredLocalDirs(conf);
   }
+
+  /**
+   * Method to check whether there exists any block which does not contain the blocklet info
+   *
+   * @param splitList
+   * @return
+   */
+  public static boolean isBlockWithoutBlockletInfoExists(List<CarbonInputSplit> splitList) {
+    for (CarbonInputSplit inputSplit : splitList) {
+      if (null == inputSplit.getDetailInfo().getBlockletInfo()) {
+        return true;
+      }
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11a795ce/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 102c6c8..e554a58 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.util.Random
+import scala.util.control.Breaks.{break, breakable}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
@@ -42,6 +43,7 @@ import org.apache.carbondata.core.datastore.block.Distributable
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.filter.FilterUtil
 import org.apache.carbondata.core.scan.model.QueryModel
 import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder}
 import org.apache.carbondata.core.statusmanager.FileFormat
@@ -51,7 +53,7 @@ import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.InitInputMetrics
-import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
+import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util}
 
 /**
  * This RDD is used to perform query on CarbonData file. Before sending tasks to scan
@@ -109,6 +111,8 @@ class CarbonScanRDD(
       }
     }
     val batchPartitions = distributeColumnarSplits(columnarSplits)
+    // check and remove InExpression from filterExpression
+    checkAndRemoveInExpressinFromFilterExpression(format, batchPartitions)
     if (streamSplits.isEmpty) {
       batchPartitions.toArray
     } else {
@@ -471,6 +475,52 @@ class CarbonScanRDD(
   }
 
   /**
+   * This method will check and remove InExpression from filterExpression to prevent the List
+   * Expression values from serializing and deserializing on executor
+   *
+   * @param format
+   * @param identifiedPartitions
+   */
+  private def checkAndRemoveInExpressinFromFilterExpression(
+      format: CarbonTableInputFormat[Object],
+      identifiedPartitions: mutable.Buffer[Partition]) = {
+    if (null != filterExpression) {
+      if (identifiedPartitions.nonEmpty &&
+          !checkForBlockWithoutBlockletInfo(identifiedPartitions)) {
+        FilterUtil.removeInExpressionNodeWithPositionIdColumn(filterExpression)
+      }
+    }
+  }
+
+  /**
+   * This method will check for presence of any block from old store (version 1.1). If any of the
+   * blocks identified does not contain the blocklet info that means that block is from old store
+   *
+   * @param identifiedPartitions
+   * @return
+   */
+  private def checkForBlockWithoutBlockletInfo(
+      identifiedPartitions: mutable.Buffer[Partition]): Boolean = {
+    var isBlockWithoutBlockletInfoPresent = false
+    breakable {
+      identifiedPartitions.foreach { value =>
+        val inputSplit = value.asInstanceOf[CarbonSparkPartition].split.value
+        val splitList = if (inputSplit.isInstanceOf[CarbonMultiBlockSplit]) {
+          inputSplit.asInstanceOf[CarbonMultiBlockSplit].getAllSplits
+        } else {
+          new java.util.ArrayList().add(inputSplit.asInstanceOf[CarbonInputSplit])
+        }.asInstanceOf[java.util.List[CarbonInputSplit]]
+        // check for block from old store (version 1.1 and below)
+        if (Util.isBlockWithoutBlockletInfoExists(splitList)) {
+          isBlockWithoutBlockletInfoPresent = true
+          break
+        }
+      }
+    }
+    isBlockWithoutBlockletInfoPresent
+  }
+
+  /**
    * Get the preferred locations where to launch this task.
    */
   override def getPreferredLocations(split: Partition): Seq[String] = {