You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/10/23 05:28:41 UTC

[carbondata] branch master updated: [CARBONDATA-3994] Skip Order by for map task if it is sort column and use limit pushdown for array_contains filter

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b9f00b  [CARBONDATA-3994] Skip Order by for map task if it is sort column and use limit pushdown for array_contains filter
8b9f00b is described below

commit 8b9f00b72b4374fca80a5a0983b2707b58e29c75
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Wed Sep 16 16:12:20 2020 +0530

    [CARBONDATA-3994] Skip Order by for map task if it is sort column and use limit pushdown for array_contains filter
    
    Why is this PR needed?
    To improve query performance in specific scenarios these changes are proposed.
    
    What changes were proposed in this PR?
    When the order by column is in sort column, every map task output will be already sorted. No need to sort the data again.
    Hence skipping the order at map task by changing plan node from TakeOrderedAndProject --> CarbonTakeOrderedAndProjectExec
    Also in this scenario collecting the limit at map task and Array_contains() will use this limit value for row scan filtering to break scan once limit value is reached.
    Also added a carbon property to control this .
    carbon.mapOrderPushDown.<db_name>_<table_name>.column
    
    Note: later we can improve this for other filters also to use the limit value.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3932
    
    Co-authored-by: li36909 <36...@qq.com>
---
 .../core/constants/CarbonCommonConstants.java      |   8 ++
 .../apache/carbondata/core/index/IndexFilter.java  |  15 +++
 .../carbondata/core/scan/filter/FilterUtil.java    |   6 +-
 .../executer/RowLevelFilterExecutorImpl.java       |  16 ++-
 ...velRangeGreaterThanEqualFilterExecutorImpl.java |   2 +-
 ...RowLevelRangeGreaterThanFilterExecutorImpl.java |   2 +-
 ...wLevelRangeLessThanEqualFilterExecutorImpl.java |   2 +-
 .../RowLevelRangeLessThanFilterExecutorImpl.java   |   2 +-
 .../resolver/RowLevelFilterResolverImpl.java       |  16 ++-
 .../apache/carbondata/core/util/SessionParams.java |   2 +
 docs/configuration-parameters.md                   |   4 +-
 .../filter/executor/PolygonFilterExecutorImpl.java |   2 +-
 .../spark/sql/CarbonDatasourceHadoopRelation.scala |  10 +-
 .../CarbonTakeOrderedAndProjectExec.scala          | 125 +++++++++++++++++++++
 .../strategy/CarbonLateDecodeStrategy.scala        |  83 ++++++++++++++
 .../complexType/TestArrayContainsPushDown.scala    |  24 ++++
 16 files changed, 304 insertions(+), 15 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index d9ed63a..15139ec 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2575,4 +2575,12 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_REORDER_FILTER_DEFAULT = "true";
 
+  /**
+   * If order by column is in sort column,
+   * specify that sort column here to avoid ordering at map task.
+   * Also the limit value can be used for row scanning to scan the data only till the limit.
+   */
+  @CarbonProperty(dynamicConfigurable = true)
+  public static final String CARBON_MAP_ORDER_PUSHDOWN = "carbon.mapOrderPushDown";
+
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/index/IndexFilter.java b/core/src/main/java/org/apache/carbondata/core/index/IndexFilter.java
index cbb41c1..c3fd782 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexFilter.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexFilter.java
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
 import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer;
 import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptimizer;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.filter.resolver.RowLevelFilterResolverImpl;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.util.ObjectSerializationUtil;
 
@@ -62,6 +63,9 @@ public class IndexFilter implements Serializable {
 
   private SegmentProperties properties;
 
+  // limit value used for row scanning, collected when carbon.mapOrderPushDown is enabled
+  private int limit = -1;
+
   public IndexFilter(CarbonTable table, Expression expression) {
     this(table, expression, false);
   }
@@ -91,6 +95,14 @@ public class IndexFilter implements Serializable {
     resolve(false);
   }
 
+  public int getLimit() {
+    return limit;
+  }
+
+  public void setLimit(int limit) {
+    this.limit = limit;
+  }
+
   Expression getNewCopyOfExpression() {
     if (expression != null) {
       try {
@@ -189,6 +201,9 @@ public class IndexFilter implements Serializable {
   public FilterResolverIntf getResolver() {
     if (resolver == null) {
       resolver = resolveFilter();
+      if (resolver instanceof RowLevelFilterResolverImpl) {
+        ((RowLevelFilterResolverImpl)resolver).setLimit(limit);
+      }
     }
     return resolver;
   }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index af8866b..9dc6b5b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -203,7 +203,8 @@ public final class FilterUtil {
                   .getMsrColEvalutorInfoList(),
               ((RowLevelFilterResolverImpl) filterExpressionResolverTree).getFilterExpresion(),
               ((RowLevelFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier(),
-              segmentProperties, complexDimensionInfoMap);
+              segmentProperties, complexDimensionInfoMap,
+              ((RowLevelFilterResolverImpl) filterExpressionResolverTree).getLimit());
 
       }
     }
@@ -212,7 +213,8 @@ public final class FilterUtil {
         ((RowLevelFilterResolverImpl) filterExpressionResolverTree).getMsrColEvalutorInfoList(),
         ((RowLevelFilterResolverImpl) filterExpressionResolverTree).getFilterExpresion(),
         ((RowLevelFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier(),
-        segmentProperties, complexDimensionInfoMap);
+        segmentProperties, complexDimensionInfoMap,
+        ((RowLevelFilterResolverImpl) filterExpressionResolverTree).getLimit());
 
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecutorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecutorImpl.java
index de31fdd..7ba181a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecutorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecutorImpl.java
@@ -113,10 +113,15 @@ public class RowLevelFilterExecutorImpl implements FilterExecutor {
    */
   private DirectDictionaryGenerator dateDictionaryGenerator;
 
+  // limit value used for row scanning, collected when carbon.mapOrderPushDown is enabled
+  // TODO: right now this is used only for Array_contains() filter,
+  // later we can make use of it for all row level filters.
+  private int limit;
+
   public RowLevelFilterExecutorImpl(List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
       List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp,
       AbsoluteTableIdentifier tableIdentifier, SegmentProperties segmentProperties,
-      Map<Integer, GenericQueryType> complexDimensionInfoMap) {
+      Map<Integer, GenericQueryType> complexDimensionInfoMap, int limit) {
     this.segmentProperties = segmentProperties;
     if (null == dimColEvaluatorInfoList) {
       this.dimColEvaluatorInfoList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
@@ -147,6 +152,7 @@ public class RowLevelFilterExecutorImpl implements FilterExecutor {
     this.complexDimensionInfoMap = complexDimensionInfoMap;
     this.dateDictionaryGenerator =
         DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(DataTypes.DATE);
+    this.limit = limit;
     initDimensionChunkIndexes();
     initMeasureChunkIndexes();
   }
@@ -266,8 +272,12 @@ public class RowLevelFilterExecutorImpl implements FilterExecutor {
           literalExpDataType);
       ArrayQueryType complexType =
           (ArrayQueryType) complexDimensionInfoMap.get(dimensionChunkIndex[0]);
+      int totalCount = 0;
       // check all the pages
       for (int i = 0; i < pageNumbers; i++) {
+        if (limit != -1 && totalCount >= limit) {
+          break;
+        }
         BitSet set = new BitSet(numberOfRows[i]);
         int[][] numberOfChild = complexType
             .getNumberOfChild(rawBlockletColumnChunks.getDimensionRawColumnChunks(), null,
@@ -277,12 +287,16 @@ public class RowLevelFilterExecutorImpl implements FilterExecutor {
                 null, i);
         // check every row
         for (int index = 0; index < numberOfRows[i]; index++) {
+          if (limit != -1 && totalCount >= limit) {
+            break;
+          }
           int dataOffset = numberOfChild[index][1];
           // loop the children
           for (int j = 0; j < numberOfChild[index][0]; j++) {
             byte[] obj = page.getChunkData(dataOffset++);
             if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(obj, filterValueInBytes) == 0) {
               set.set(index);
+              totalCount++;
               break;
             }
           }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGreaterThanEqualFilterExecutorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGreaterThanEqualFilterExecutorImpl.java
index 2326334..3857a2b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGreaterThanEqualFilterExecutorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGreaterThanEqualFilterExecutorImpl.java
@@ -62,7 +62,7 @@ public class RowLevelRangeGreaterThanEqualFilterExecutorImpl extends RowLevelFil
       AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
       Object[] msrFilterRangeValues, SegmentProperties segmentProperties) {
     super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier, segmentProperties,
-        null);
+        null, -1);
     this.filterRangeValues = filterRangeValues;
     this.msrFilterRangeValues = msrFilterRangeValues;
     if (!msrColEvalutorInfoList.isEmpty()) {
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGreaterThanFilterExecutorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGreaterThanFilterExecutorImpl.java
index 0849e5e..50ce0ca 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGreaterThanFilterExecutorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGreaterThanFilterExecutorImpl.java
@@ -62,7 +62,7 @@ public class RowLevelRangeGreaterThanFilterExecutorImpl extends RowLevelFilterEx
       AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
       Object[] msrFilterRangeValues, SegmentProperties segmentProperties) {
     super(dimColEvaluatorInfoList, msrColEvoluatorInfoList, exp, tableIdentifier, segmentProperties,
-        null);
+        null, -1);
     this.filterRangeValues = filterRangeValues;
     this.msrFilterRangeValues = msrFilterRangeValues;
     if (!this.msrColEvalutorInfoList.isEmpty()) {
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecutorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecutorImpl.java
index b22a91d..f4800be 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecutorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecutorImpl.java
@@ -61,7 +61,7 @@ public class RowLevelRangeLessThanEqualFilterExecutorImpl extends RowLevelFilter
       AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
       Object[] msrFilterRangeValues, SegmentProperties segmentProperties) {
     super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier, segmentProperties,
-        null);
+        null, -1);
     this.filterRangeValues = filterRangeValues;
     this.msrFilterRangeValues = msrFilterRangeValues;
     if (!msrColEvalutorInfoList.isEmpty()) {
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecutorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecutorImpl.java
index e2cddca..125e3a9 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecutorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecutorImpl.java
@@ -61,7 +61,7 @@ public class RowLevelRangeLessThanFilterExecutorImpl extends RowLevelFilterExecu
       AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
       Object[] msrFilterRangeValues, SegmentProperties segmentProperties) {
     super(dimColEvaluatorInfoList, msrColEvaluatorInfoList, exp, tableIdentifier, segmentProperties,
-        null);
+        null, -1);
     this.filterRangeValues = filterRangeValues;
     this.msrFilterRangeValues = msrFilterRangeValues;
     if (!msrColEvaluatorInfoList.isEmpty()) {
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelFilterResolverImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelFilterResolverImpl.java
index 3672f2d..8e22c9b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelFilterResolverImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelFilterResolverImpl.java
@@ -36,17 +36,25 @@ public class RowLevelFilterResolverImpl extends ConditionalFilterResolverImpl {
   private List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList;
   private List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList;
   private AbsoluteTableIdentifier tableIdentifier;
+  // limit value used for row scanning, collected when carbon.mapOrderPushDown is enabled
+  private int limit = -1;
 
   public RowLevelFilterResolverImpl(Expression exp, boolean isExpressionResolve,
       boolean isIncludeFilter, AbsoluteTableIdentifier tableIdentifier) {
     super(exp, isExpressionResolve, isIncludeFilter, false);
-    dimColEvaluatorInfoList =
-        new ArrayList<DimColumnResolvedFilterInfo>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    msrColEvalutorInfoList = new ArrayList<MeasureColumnResolvedFilterInfo>(
-        CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    dimColEvaluatorInfoList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    msrColEvalutorInfoList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     this.tableIdentifier = tableIdentifier;
   }
 
+  public int getLimit() {
+    return limit;
+  }
+
+  public void setLimit(int limit) {
+    this.limit = limit;
+  }
+
   /**
    * Method which will resolve the filter expression by converting the filter member
    * to its assigned dictionary values.
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 0fd4e82..c611256 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -229,6 +229,8 @@ public class SessionParams implements Serializable, Cloneable {
           }
         } else if (key.equalsIgnoreCase(CARBON_REORDER_FILTER)) {
           isValid = true;
+        } else if (key.startsWith(CARBON_MAP_ORDER_PUSHDOWN)) {
+          isValid = true;
         } else {
           throw new InvalidConfigurationException(
               "The key " + key + " not supported for dynamic configuration.");
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 70fc3f3..f38983b 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -147,6 +147,7 @@ This section provides the details of all the configurations required for the Car
 | carbon.si.lookup.partialstring | true | When true, it includes starts with, ends with and contains. When false, it includes only starts with secondary indexes. |
 | carbon.max.pagination.lru.cache.size.in.mb | -1 | Maximum memory **(in MB)** upto which the SDK pagination reader can cache the blocklet rows. Suggest to configure as multiple of blocklet size. Default value of -1 means there is no memory limit for caching. Only integer values greater than 0 are accepted. |
 | carbon.partition.max.driver.lru.cache.size | -1 | Maximum memory **(in MB)** upto which driver can cache partition metadata. Beyond this, least recently used data will be removed from cache before loading new set of values.
+| carbon.mapOrderPushDown.<db_name>_<table_name>.column| empty | If order by column is in sort column, specify that sort column here to avoid ordering at map task . |
 
 ## Data Mutation Configuration
 | Parameter | Default Value | Description |
@@ -231,7 +232,8 @@ RESET
 | carbon.index.visible.<db_name>.<table_name>.<index_name> | To specify query on ***db_name.table_name*** to not use the index ***index_name***. |
 | carbon.load.indexes.parallel.<db_name>.<table_name> | To enable parallel index loading for a table. when db_name.table_name are not specified, i.e., when ***carbon.load.indexes.parallel.*** is set, it applies for all the tables of the session. |
 | carbon.enable.index.server                | To use index server for caching and pruning. This property can be used for a session or for a particular table with ***carbon.enable.index.server.<db_name>.<table_name>***. |
-| carbon.reorder.filter                     | This property can be used to enabled/disable filter reordering. Should be disabled only when the user has optimized the filter condition.
+| carbon.reorder.filter                     | This property can be used to enabled/disable filter reordering. Should be disabled only when the user has optimized the filter condition. | 
+| carbon.mapOrderPushDown.<db_name>_<table_name>.column | If order by column is in sort column, specify that sort column here to avoid ordering at map task . |
 **Examples:**
 
 * Add or Update:
diff --git a/geo/src/main/java/org/apache/carbondata/geo/scan/filter/executor/PolygonFilterExecutorImpl.java b/geo/src/main/java/org/apache/carbondata/geo/scan/filter/executor/PolygonFilterExecutorImpl.java
index 230884e..c5a7317 100644
--- a/geo/src/main/java/org/apache/carbondata/geo/scan/filter/executor/PolygonFilterExecutorImpl.java
+++ b/geo/src/main/java/org/apache/carbondata/geo/scan/filter/executor/PolygonFilterExecutorImpl.java
@@ -41,7 +41,7 @@ public class PolygonFilterExecutorImpl extends RowLevelFilterExecutorImpl {
       AbsoluteTableIdentifier tableIdentifier, SegmentProperties segmentProperties,
       Map<Integer, GenericQueryType> complexDimensionInfoMap) {
     super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier, segmentProperties,
-        complexDimensionInfoMap);
+        complexDimensionInfoMap, -1);
   }
 
   private int getNearestRangeIndex(List<Long[]> ranges, long searchForNumber) {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index b0b0742..10335d9 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -46,7 +46,8 @@ case class CarbonDatasourceHadoopRelation(
     sparkSession: SparkSession,
     paths: Array[String],
     parameters: Map[String, String],
-    tableSchema: Option[StructType])
+    tableSchema: Option[StructType],
+    limit: Int = -1)
   extends BaseRelation with InsertableRelation {
 
   val caseInsensitiveMap: Map[String, String] = parameters.map(f => (f._1.toLowerCase, f._2))
@@ -93,10 +94,15 @@ case class CarbonDatasourceHadoopRelation(
     }
 
     val inputMetricsStats: CarbonInputMetrics = new CarbonInputMetrics
+    val filter = filterExpression.map(new IndexFilter(carbonTable, _, true)).orNull
+    if (filter != null && filters.length == 1) {
+      // push down the limit if only one filter
+      filter.setLimit(limit)
+    }
     new CarbonScanRDD(
       sparkSession,
       projection,
-      filterExpression.map(new IndexFilter(carbonTable, _, true)).orNull,
+      filter,
       identifier,
       carbonTable.getTableInfo.serialize(),
       carbonTable.getTableInfo,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/CarbonTakeOrderedAndProjectExec.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/CarbonTakeOrderedAndProjectExec.scala
new file mode 100644
index 0000000..5a5e93a
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/CarbonTakeOrderedAndProjectExec.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, SinglePartition}
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.util.Utils
+
+// To skip the order at map task
+case class CarbonTakeOrderedAndProjectExec(
+    limit: Int,
+    sortOrder: Seq[SortOrder],
+    projectList: Seq[NamedExpression],
+    child: SparkPlan,
+    skipMapOrder: Boolean = false,
+    readFromHead: Boolean = true) extends UnaryExecNode {
+
+  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
+
+  override def executeCollect(): Array[InternalRow] = {
+    val ordering = new LazilyGeneratedOrdering(sortOrder, child.output)
+    val rdd = child.execute().map(_.copy())
+    val data = takeOrdered(rdd, limit, skipMapOrder, readFromHead)(ordering)
+    if (projectList != child.output) {
+      val projection = UnsafeProjection.create(projectList, child.output)
+      data.map(r => projection(r).copy())
+    } else {
+      data
+    }
+  }
+
+  def takeOrdered(rdd: RDD[InternalRow],
+      num: Int,
+      skipMapOrder: Boolean = false,
+      readFromHead: Boolean = true)(implicit ord: Ordering[InternalRow]): Array[InternalRow] = {
+    if (!skipMapOrder) {
+      return rdd.takeOrdered(num)(ord)
+    }
+    // new ShuffledRowRDD by skipping the order at map task as column data is already sorted
+    if (num == 0) {
+      Array.empty
+    } else {
+      val mapRDDs = rdd.mapPartitions { items =>
+        if (readFromHead) {
+          items.slice(0, num)
+        } else {
+          items.drop(items.size - num)
+        }
+      }
+      if (mapRDDs.partitions.length == 0) {
+        Array.empty
+      } else {
+        mapRDDs.collect().sorted(ord)
+      }
+    }
+  }
+
+  override def outputOrdering: Seq[SortOrder] = sortOrder
+
+  override def outputPartitioning: Partitioning = SinglePartition
+
+  override def simpleString: String = {
+    val orderByString = Utils.truncatedString(sortOrder, "[", ",", "]")
+    val outputString = Utils.truncatedString(output, "[", ",", "]")
+
+    s"CarbonTakeOrderedAndProjectExec(limit=$limit, orderBy=$orderByString, " +
+    s"skipMapOrder=$skipMapOrder, readFromHead=$readFromHead, output=$outputString)"
+  }
+
+  override def output: Seq[Attribute] = {
+    projectList.map(_.toAttribute)
+  }
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
+    val localTopK: RDD[InternalRow] = {
+      child.execute().map(_.copy()).mapPartitions { iter =>
+        if (skipMapOrder) {
+          // new ShuffledRowRDD by skipping the order at map task as column data is already sorted
+          if (readFromHead) {
+            iter.slice(0, limit)
+          } else {
+            iter.drop(iter.size - limit)
+          }
+        } else {
+          org.apache.spark.util.collection.Utils.takeOrdered(iter, limit)(ord)
+        }
+      }
+    }
+    // update with modified RDD (localTopK)
+    val shuffled = new ShuffledRowRDD(
+      ShuffleExchangeExec.prepareShuffleDependency(
+        localTopK, child.output, SinglePartition, serializer))
+    shuffled.mapPartitions { iter =>
+      val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord)
+      if (projectList != child.output) {
+        val projection = UnsafeProjection.create(projectList, child.output)
+        topK.map(r => projection(r))
+      } else {
+        topK
+      }
+    }
+  }
+
+}
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 415e19a..4b062e9 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.strategy
 
+import java.util.Locale
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
@@ -187,6 +189,8 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
           planLater(right),
           condition)
         condition.map(FilterExec(_, pushedDownJoin)).getOrElse(pushedDownJoin) :: Nil
+      case ExtractTakeOrderedAndProjectExec(carbonTakeOrderedAndProjectExec) =>
+        carbonTakeOrderedAndProjectExec :: Nil
       case _ => Nil
     }
   }
@@ -982,6 +986,85 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
           null)(sparkSession)
     }
   }
+
+  object ExtractTakeOrderedAndProjectExec {
+
+    def unapply(plan: LogicalPlan): Option[CarbonTakeOrderedAndProjectExec] = {
+      val allRelations = plan.collect { case logicalRelation: LogicalRelation => logicalRelation }
+      // push down order by limit to carbon map task,
+      // only when there are only one CarbonDatasourceHadoopRelation
+      if (allRelations.size != 1 ||
+          allRelations.exists(x => !x.relation.isInstanceOf[CarbonDatasourceHadoopRelation])) {
+        return None
+      }
+      //  check and Replace TakeOrderedAndProject (physical plan node for order by + limit)
+      //  with CarbonTakeOrderedAndProjectExec.
+      val relation = allRelations.head.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+      plan match {
+        case ReturnAnswer(rootPlan) => rootPlan match {
+          case Limit(IntegerLiteral(limit), Sort(order, true, child)) =>
+            carbonTakeOrder(relation, limit,
+              order,
+              child.output,
+              planLater(pushLimit(limit, child)))
+          case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) =>
+            carbonTakeOrder(relation, limit, order, projectList, planLater(pushLimit(limit, child)))
+          case _ => None
+        }
+        case Limit(IntegerLiteral(limit), Sort(order, true, child)) =>
+          carbonTakeOrder(relation, limit, order, child.output, planLater(pushLimit(limit, child)))
+        case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) =>
+          carbonTakeOrder(relation, limit, order, projectList, planLater(pushLimit(limit, child)))
+        case _ => None
+      }
+    }
+
+    private def carbonTakeOrder(relation: CarbonDatasourceHadoopRelation,
+        limit: Int,
+        orders: Seq[SortOrder],
+        projectList: Seq[NamedExpression],
+        child: SparkPlan): Option[CarbonTakeOrderedAndProjectExec] = {
+      val latestOrder = orders.last
+      val fromHead: Boolean = latestOrder.direction match {
+        case Ascending => true
+        case Descending => false
+      }
+      val (columnName, canPushDown) = latestOrder.child match {
+        case attr: AttributeReference => (attr.name, true)
+        case Alias(AttributeReference(name, _, _, _), _) => (name, true)
+        case _ => (null, false)
+      }
+      val mapOrderPushDown = CarbonProperties.getInstance.getProperty(
+        CarbonCommonConstants.CARBON_MAP_ORDER_PUSHDOWN + "." +
+        s"${ relation.carbonTable.getTableUniqueName.toLowerCase(Locale.ROOT) }.column")
+      // when this property is enabled and order by column is in sort column,
+      // enable limit push down to map task, row scanner can use this limit.
+      val sortColumns = relation.carbonTable.getSortColumns
+      if (mapOrderPushDown != null && canPushDown && sortColumns.size() > 0
+          && sortColumns.get(0).equalsIgnoreCase(columnName)
+          && mapOrderPushDown.equalsIgnoreCase(columnName)) {
+        // Replace TakeOrderedAndProject (which comes after physical plan with limit and order by)
+        // with CarbonTakeOrderedAndProjectExec.
+        // which will skip the order at map task as column data is already sorted
+        Some(CarbonTakeOrderedAndProjectExec(limit, orders, projectList, child, skipMapOrder =
+          true, readFromHead = fromHead))
+      } else {
+        None
+      }
+    }
+
+    def pushLimit(limit: Int, plan: LogicalPlan): LogicalPlan = {
+      val newPlan = plan transform {
+        case lr: LogicalRelation =>
+          val newRelation = lr.copy(relation = lr.relation
+            .asInstanceOf[CarbonDatasourceHadoopRelation]
+            .copy(limit = limit))
+          newRelation
+        case other => other
+      }
+      newPlan
+    }
+  }
 }
 
 class CarbonPhysicalPlanException extends Exception
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestArrayContainsPushDown.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestArrayContainsPushDown.scala
index 226e05c..8ff94da 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestArrayContainsPushDown.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestArrayContainsPushDown.scala
@@ -66,6 +66,30 @@ class TestArrayContainsPushDown extends QueryTest with BeforeAndAfterAll {
     sql("drop table complex1")
   }
 
+  test("test array contains push down using limit " +
+       "in row level scan when order by column is sort column") {
+    sql("drop table if exists complex1")
+
+    sql("create table complex1 (id int, arr array<String>) " +
+        "stored as carbondata TBLPROPERTIES ('SORT_COLUMNS'='id')")
+    sql("insert into complex1 select 1, array('as') union all " +
+        "select 2, array('sd','df','gh') union all " +
+        "select 3, array('rt','ew','rtyu','jk',null) union all " +
+        "select 4, array('ghsf','dbv','','ty') union all " +
+        "select 5, array('hjsd','fggb','nhj','sd','asd')")
+
+    // enable the property
+    CarbonProperties.getInstance()
+      .addProperty("carbon.mapOrderPushDown.default_complex1.column", "id")
+    // check for CarbonTakeOrderedAndProjectExec node in plan
+    checkExistence(sql(
+      " explain select * from complex1 where array_contains(arr,'sd') order by id limit 1"),
+      true,
+      "CarbonTakeOrderedAndProjectExec")
+    CarbonProperties.getInstance().removeProperty("carbon.mapOrderPushDown.default_complex1.column")
+    sql("drop table complex1")
+  }
+
   test("test array contains pushdown for array of boolean") {
     sql("drop table if exists complex1")
     sql("create table complex1 (arr array<boolean>) stored as carbondata")