You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/09/20 12:18:44 UTC

spark git commit: Revert [SPARK-19355][SPARK-25352]

Repository: spark
Updated Branches:
  refs/heads/master 7ff5386ed -> 89671a27e


Revert [SPARK-19355][SPARK-25352]

## What changes were proposed in this pull request?

This goes to revert sequential PRs based on some discussion and comments at https://github.com/apache/spark/pull/16677#issuecomment-422650759.

#22344
#22330
#22239
#16677

## How was this patch tested?

Existing tests.

Closes #22481 from viirya/revert-SPARK-19355-1.

Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/89671a27
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/89671a27
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/89671a27

Branch: refs/heads/master
Commit: 89671a27e783d77d4bfaec3d422cc8dd468ef04c
Parents: 7ff5386
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Thu Sep 20 20:18:31 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Sep 20 20:18:31 2018 +0800

----------------------------------------------------------------------
 .../sort/BypassMergeSortShuffleWriter.java      |   5 +-
 .../spark/shuffle/sort/UnsafeShuffleWriter.java |   3 +-
 .../org/apache/spark/MapOutputStatistics.scala  |   6 +-
 .../org/apache/spark/MapOutputTracker.scala     |  10 +-
 .../org/apache/spark/scheduler/MapStatus.scala  |  43 +++-----
 .../spark/shuffle/sort/SortShuffleWriter.scala  |   3 +-
 .../shuffle/sort/UnsafeShuffleWriterSuite.java  |   2 -
 .../apache/spark/MapOutputTrackerSuite.scala    |  28 ++---
 .../scala/org/apache/spark/ShuffleSuite.scala   |   1 -
 .../spark/scheduler/DAGSchedulerSuite.scala     |  10 +-
 .../apache/spark/scheduler/MapStatusSuite.scala |  16 +--
 .../spark/serializer/KryoSerializerSuite.scala  |   3 +-
 .../catalyst/plans/physical/partitioning.scala  |  14 ---
 .../org/apache/spark/sql/internal/SQLConf.scala |   9 --
 .../spark/sql/execution/SparkStrategies.scala   |  35 +++----
 .../exchange/ShuffleExchangeExec.scala          |   8 --
 .../org/apache/spark/sql/execution/limit.scala  | 104 +++----------------
 .../test/resources/sql-tests/inputs/limit.sql   |   2 -
 .../inputs/subquery/in-subquery/in-limit.sql    |   5 +-
 .../resources/sql-tests/results/limit.sql.out   |  92 ++++++++--------
 .../subquery/in-subquery/in-limit.sql.out       |  56 +++++-----
 .../spark/sql/DataFrameAggregateSuite.scala     |  12 +--
 .../org/apache/spark/sql/DataFrameSuite.scala   |  22 +---
 .../org/apache/spark/sql/SQLQuerySuite.scala    |  11 +-
 .../execution/ExchangeCoordinatorSuite.scala    |   6 +-
 .../apache/spark/sql/execution/LimitSuite.scala |  81 ---------------
 .../spark/sql/execution/PlannerSuite.scala      |   4 +-
 .../execution/TakeOrderedAndProjectSuite.scala  |  85 ++++++---------
 .../hive/execution/HiveCompatibilitySuite.scala |   4 -
 .../spark/sql/hive/execution/PruningSuite.scala |   8 --
 30 files changed, 184 insertions(+), 504 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index e3bd549..323a5d3 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -125,7 +125,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
     if (!records.hasNext()) {
       partitionLengths = new long[numPartitions];
       shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
-      mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, 0);
+      mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
       return;
     }
     final SerializerInstance serInstance = serializer.newInstance();
@@ -167,8 +167,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
         logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
       }
     }
-    mapStatus = MapStatus$.MODULE$.apply(
-      blockManager.shuffleServerId(), partitionLengths, writeMetrics.recordsWritten());
+    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 069e6d5..4839d04 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -248,8 +248,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
         logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
       }
     }
-    mapStatus = MapStatus$.MODULE$.apply(
-      blockManager.shuffleServerId(), partitionLengths, writeMetrics.recordsWritten());
+    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala b/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala
index ff85e11..f8a6f1d 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala
@@ -23,9 +23,5 @@ package org.apache.spark
  * @param shuffleId ID of the shuffle
  * @param bytesByPartitionId approximate number of output bytes for each map output partition
  *   (may be inexact due to use of compressed map statuses)
- * @param recordsByPartitionId number of output records for each map output partition
  */
-private[spark] class MapOutputStatistics(
-    val shuffleId: Int,
-    val bytesByPartitionId: Array[Long],
-    val recordsByPartitionId: Array[Long])
+private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long])

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 41575ce..1c4fa4b 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -522,19 +522,16 @@ private[spark] class MapOutputTrackerMaster(
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
     shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
       val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-      val recordsByMapTask = new Array[Long](statuses.length)
-
       val parallelAggThreshold = conf.get(
         SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
       val parallelism = math.min(
         Runtime.getRuntime.availableProcessors(),
         statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1).toInt
       if (parallelism <= 1) {
-        statuses.zipWithIndex.foreach { case (s, index) =>
+        for (s <- statuses) {
           for (i <- 0 until totalSizes.length) {
             totalSizes(i) += s.getSizeForBlock(i)
           }
-          recordsByMapTask(index) = s.numberOfOutput
         }
       } else {
         val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
@@ -551,11 +548,8 @@ private[spark] class MapOutputTrackerMaster(
         } finally {
           threadPool.shutdown()
         }
-        statuses.zipWithIndex.foreach { case (s, index) =>
-          recordsByMapTask(index) = s.numberOfOutput
-        }
       }
-      new MapOutputStatistics(dep.shuffleId, totalSizes, recordsByMapTask)
+      new MapOutputStatistics(dep.shuffleId, totalSizes)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 7e1d75f..659694d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -31,8 +31,7 @@ import org.apache.spark.util.Utils
 
 /**
  * Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
- * task ran on, the sizes of outputs for each reducer, and the number of outputs of the map task,
- * for passing on to the reduce tasks.
+ * task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
  */
 private[spark] sealed trait MapStatus {
   /** Location where this task was run. */
@@ -45,23 +44,18 @@ private[spark] sealed trait MapStatus {
    * necessary for correctness, since block fetchers are allowed to skip zero-size blocks.
    */
   def getSizeForBlock(reduceId: Int): Long
-
-  /**
-   * The number of outputs for the map task.
-   */
-  def numberOfOutput: Long
 }
 
 
 private[spark] object MapStatus {
 
-  def apply(loc: BlockManagerId, uncompressedSizes: Array[Long], numOutput: Long): MapStatus = {
+  def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
     if (uncompressedSizes.length >  Option(SparkEnv.get)
       .map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
       .getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)) {
-      HighlyCompressedMapStatus(loc, uncompressedSizes, numOutput)
+      HighlyCompressedMapStatus(loc, uncompressedSizes)
     } else {
-      new CompressedMapStatus(loc, uncompressedSizes, numOutput)
+      new CompressedMapStatus(loc, uncompressedSizes)
     }
   }
 
@@ -104,34 +98,29 @@ private[spark] object MapStatus {
  */
 private[spark] class CompressedMapStatus(
     private[this] var loc: BlockManagerId,
-    private[this] var compressedSizes: Array[Byte],
-    private[this] var numOutput: Long)
+    private[this] var compressedSizes: Array[Byte])
   extends MapStatus with Externalizable {
 
-  protected def this() = this(null, null.asInstanceOf[Array[Byte]], -1)  // For deserialization only
+  protected def this() = this(null, null.asInstanceOf[Array[Byte]])  // For deserialization only
 
-  def this(loc: BlockManagerId, uncompressedSizes: Array[Long], numOutput: Long) {
-    this(loc, uncompressedSizes.map(MapStatus.compressSize), numOutput)
+  def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
+    this(loc, uncompressedSizes.map(MapStatus.compressSize))
   }
 
   override def location: BlockManagerId = loc
 
-  override def numberOfOutput: Long = numOutput
-
   override def getSizeForBlock(reduceId: Int): Long = {
     MapStatus.decompressSize(compressedSizes(reduceId))
   }
 
   override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
     loc.writeExternal(out)
-    out.writeLong(numOutput)
     out.writeInt(compressedSizes.length)
     out.write(compressedSizes)
   }
 
   override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
     loc = BlockManagerId(in)
-    numOutput = in.readLong()
     val len = in.readInt()
     compressedSizes = new Array[Byte](len)
     in.readFully(compressedSizes)
@@ -154,20 +143,17 @@ private[spark] class HighlyCompressedMapStatus private (
     private[this] var numNonEmptyBlocks: Int,
     private[this] var emptyBlocks: RoaringBitmap,
     private[this] var avgSize: Long,
-    private var hugeBlockSizes: Map[Int, Byte],
-    private[this] var numOutput: Long)
+    private var hugeBlockSizes: Map[Int, Byte])
   extends MapStatus with Externalizable {
 
   // loc could be null when the default constructor is called during deserialization
   require(loc == null || avgSize > 0 || hugeBlockSizes.size > 0 || numNonEmptyBlocks == 0,
     "Average size can only be zero for map stages that produced no output")
 
-  protected def this() = this(null, -1, null, -1, null, -1)  // For deserialization only
+  protected def this() = this(null, -1, null, -1, null)  // For deserialization only
 
   override def location: BlockManagerId = loc
 
-  override def numberOfOutput: Long = numOutput
-
   override def getSizeForBlock(reduceId: Int): Long = {
     assert(hugeBlockSizes != null)
     if (emptyBlocks.contains(reduceId)) {
@@ -182,7 +168,6 @@ private[spark] class HighlyCompressedMapStatus private (
 
   override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
     loc.writeExternal(out)
-    out.writeLong(numOutput)
     emptyBlocks.writeExternal(out)
     out.writeLong(avgSize)
     out.writeInt(hugeBlockSizes.size)
@@ -194,7 +179,6 @@ private[spark] class HighlyCompressedMapStatus private (
 
   override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
     loc = BlockManagerId(in)
-    numOutput = in.readLong()
     emptyBlocks = new RoaringBitmap()
     emptyBlocks.readExternal(in)
     avgSize = in.readLong()
@@ -210,10 +194,7 @@ private[spark] class HighlyCompressedMapStatus private (
 }
 
 private[spark] object HighlyCompressedMapStatus {
-  def apply(
-      loc: BlockManagerId,
-      uncompressedSizes: Array[Long],
-      numOutput: Long): HighlyCompressedMapStatus = {
+  def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = {
     // We must keep track of which blocks are empty so that we don't report a zero-sized
     // block as being non-empty (or vice-versa) when using the average block size.
     var i = 0
@@ -254,6 +235,6 @@ private[spark] object HighlyCompressedMapStatus {
     emptyBlocks.trim()
     emptyBlocks.runOptimize()
     new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize,
-      hugeBlockSizesArray.toMap, numOutput)
+      hugeBlockSizesArray.toMap)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 91fc267..274399b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -70,8 +70,7 @@ private[spark] class SortShuffleWriter[K, V, C](
       val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
       val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
       shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
-      mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths,
-        writeMetrics.recordsWritten)
+      mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
     } finally {
       if (tmp.exists() && !tmp.delete()) {
         logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index faa70f2..0d5c5ea 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -233,7 +233,6 @@ public class UnsafeShuffleWriterSuite {
     writer.write(Iterators.emptyIterator());
     final Option<MapStatus> mapStatus = writer.stop(true);
     assertTrue(mapStatus.isDefined());
-    assertEquals(0, mapStatus.get().numberOfOutput());
     assertTrue(mergedOutputFile.exists());
     assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile);
     assertEquals(0, taskMetrics.shuffleWriteMetrics().recordsWritten());
@@ -253,7 +252,6 @@ public class UnsafeShuffleWriterSuite {
     writer.write(dataToWrite.iterator());
     final Option<MapStatus> mapStatus = writer.stop(true);
     assertTrue(mapStatus.isDefined());
-    assertEquals(NUM_PARTITITONS, mapStatus.get().numberOfOutput());
     assertTrue(mergedOutputFile.exists());
 
     long sumOfPartitionSizes = 0;

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index e797396..21f481d 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -62,9 +62,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
     val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L))
     tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
-        Array(1000L, 10000L), 10))
+        Array(1000L, 10000L)))
     tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
-        Array(10000L, 1000L), 10))
+        Array(10000L, 1000L)))
     val statuses = tracker.getMapSizesByExecutorId(10, 0)
     assert(statuses.toSet ===
       Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000))),
@@ -84,9 +84,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     val compressedSize1000 = MapStatus.compressSize(1000L)
     val compressedSize10000 = MapStatus.compressSize(10000L)
     tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
-      Array(compressedSize1000, compressedSize10000), 10))
+      Array(compressedSize1000, compressedSize10000)))
     tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
-      Array(compressedSize10000, compressedSize1000), 10))
+      Array(compressedSize10000, compressedSize1000)))
     assert(tracker.containsShuffle(10))
     assert(tracker.getMapSizesByExecutorId(10, 0).nonEmpty)
     assert(0 == tracker.getNumCachedSerializedBroadcast)
@@ -107,9 +107,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     val compressedSize1000 = MapStatus.compressSize(1000L)
     val compressedSize10000 = MapStatus.compressSize(10000L)
     tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
-        Array(compressedSize1000, compressedSize1000, compressedSize1000), 10))
+        Array(compressedSize1000, compressedSize1000, compressedSize1000)))
     tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
-        Array(compressedSize10000, compressedSize1000, compressedSize1000), 10))
+        Array(compressedSize10000, compressedSize1000, compressedSize1000)))
 
     assert(0 == tracker.getNumCachedSerializedBroadcast)
     // As if we had two simultaneous fetch failures
@@ -145,7 +145,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
 
     val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
     masterTracker.registerMapOutput(10, 0, MapStatus(
-      BlockManagerId("a", "hostA", 1000), Array(1000L), 10))
+      BlockManagerId("a", "hostA", 1000), Array(1000L)))
     slaveTracker.updateEpoch(masterTracker.getEpoch)
     assert(slaveTracker.getMapSizesByExecutorId(10, 0).toSeq ===
       Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000)))))
@@ -182,7 +182,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     // Message size should be ~123B, and no exception should be thrown
     masterTracker.registerShuffle(10, 1)
     masterTracker.registerMapOutput(10, 0, MapStatus(
-      BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0), 0))
+      BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0)))
     val senderAddress = RpcAddress("localhost", 12345)
     val rpcCallContext = mock(classOf[RpcCallContext])
     when(rpcCallContext.senderAddress).thenReturn(senderAddress)
@@ -216,11 +216,11 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     // on hostB with output size 3
     tracker.registerShuffle(10, 3)
     tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
-        Array(2L), 1))
+        Array(2L)))
     tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("a", "hostA", 1000),
-        Array(2L), 1))
+        Array(2L)))
     tracker.registerMapOutput(10, 2, MapStatus(BlockManagerId("b", "hostB", 1000),
-        Array(3L), 1))
+        Array(3L)))
 
     // When the threshold is 50%, only host A should be returned as a preferred location
     // as it has 4 out of 7 bytes of output.
@@ -260,7 +260,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
       masterTracker.registerShuffle(20, 100)
       (0 until 100).foreach { i =>
         masterTracker.registerMapOutput(20, i, new CompressedMapStatus(
-          BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0), 0))
+          BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0)))
       }
       val senderAddress = RpcAddress("localhost", 12345)
       val rpcCallContext = mock(classOf[RpcCallContext])
@@ -309,9 +309,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
     val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L))
     tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
-      Array(size0, size1000, size0, size10000), 1))
+      Array(size0, size1000, size0, size10000)))
     tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
-      Array(size10000, size0, size1000, size0), 1))
+      Array(size10000, size0, size1000, size0)))
     assert(tracker.containsShuffle(10))
     assert(tracker.getMapSizesByExecutorId(10, 0, 4).toSeq ===
         Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 456f97b..b917469 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -391,7 +391,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
     assert(mapOutput2.isDefined)
     assert(mapOutput1.get.location === mapOutput2.get.location)
     assert(mapOutput1.get.getSizeForBlock(0) === mapOutput1.get.getSizeForBlock(0))
-    assert(mapOutput1.get.numberOfOutput === mapOutput2.get.numberOfOutput)
 
     // register one of the map outputs -- doesn't matter which one
     mapOutput1.foreach { case mapStatus =>

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 365eab0..d6c9ae6 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -445,17 +445,17 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
     // map stage1 completes successfully, with one task on each executor
     complete(taskSets(0), Seq(
       (Success,
-        MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), 1)),
+        MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2))),
       (Success,
-        MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), 1)),
+        MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2))),
       (Success, makeMapStatus("hostB", 1))
     ))
     // map stage2 completes successfully, with one task on each executor
     complete(taskSets(1), Seq(
       (Success,
-        MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), 1)),
+        MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2))),
       (Success,
-        MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), 1)),
+        MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2))),
       (Success, makeMapStatus("hostB", 1))
     ))
     // make sure our test setup is correct
@@ -2857,7 +2857,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
 
 object DAGSchedulerSuite {
   def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus =
-    MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes), 1)
+    MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes))
 
   def makeBlockManagerId(host: String): BlockManagerId =
     BlockManagerId("exec-" + host, host, 12345)

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
index 555e48b..354e638 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
@@ -60,7 +60,7 @@ class MapStatusSuite extends SparkFunSuite {
       stddev <- Seq(0.0, 0.01, 0.5, 1.0)
     ) {
       val sizes = Array.fill[Long](numSizes)(abs(round(Random.nextGaussian() * stddev)) + mean)
-      val status = MapStatus(BlockManagerId("a", "b", 10), sizes, 1)
+      val status = MapStatus(BlockManagerId("a", "b", 10), sizes)
       val status1 = compressAndDecompressMapStatus(status)
       for (i <- 0 until numSizes) {
         if (sizes(i) != 0) {
@@ -74,7 +74,7 @@ class MapStatusSuite extends SparkFunSuite {
 
   test("large tasks should use " + classOf[HighlyCompressedMapStatus].getName) {
     val sizes = Array.fill[Long](2001)(150L)
-    val status = MapStatus(null, sizes, 1)
+    val status = MapStatus(null, sizes)
     assert(status.isInstanceOf[HighlyCompressedMapStatus])
     assert(status.getSizeForBlock(10) === 150L)
     assert(status.getSizeForBlock(50) === 150L)
@@ -86,7 +86,7 @@ class MapStatusSuite extends SparkFunSuite {
     val sizes = Array.tabulate[Long](3000) { i => i.toLong }
     val avg = sizes.sum / sizes.count(_ != 0)
     val loc = BlockManagerId("a", "b", 10)
-    val status = MapStatus(loc, sizes, 1)
+    val status = MapStatus(loc, sizes)
     val status1 = compressAndDecompressMapStatus(status)
     assert(status1.isInstanceOf[HighlyCompressedMapStatus])
     assert(status1.location == loc)
@@ -108,7 +108,7 @@ class MapStatusSuite extends SparkFunSuite {
     val smallBlockSizes = sizes.filter(n => n > 0 && n < threshold)
     val avg = smallBlockSizes.sum / smallBlockSizes.length
     val loc = BlockManagerId("a", "b", 10)
-    val status = MapStatus(loc, sizes, 1)
+    val status = MapStatus(loc, sizes)
     val status1 = compressAndDecompressMapStatus(status)
     assert(status1.isInstanceOf[HighlyCompressedMapStatus])
     assert(status1.location == loc)
@@ -164,7 +164,7 @@ class MapStatusSuite extends SparkFunSuite {
     SparkEnv.set(env)
     // Value of element in sizes is equal to the corresponding index.
     val sizes = (0L to 2000L).toArray
-    val status1 = MapStatus(BlockManagerId("exec-0", "host-0", 100), sizes, 1)
+    val status1 = MapStatus(BlockManagerId("exec-0", "host-0", 100), sizes)
     val arrayStream = new ByteArrayOutputStream(102400)
     val objectOutputStream = new ObjectOutputStream(arrayStream)
     assert(status1.isInstanceOf[HighlyCompressedMapStatus])
@@ -196,19 +196,19 @@ class MapStatusSuite extends SparkFunSuite {
     SparkEnv.set(env)
     val sizes = Array.fill[Long](500)(150L)
     // Test default value
-    val status = MapStatus(null, sizes, 1)
+    val status = MapStatus(null, sizes)
     assert(status.isInstanceOf[CompressedMapStatus])
     // Test Non-positive values
     for (s <- -1 to 0) {
       assertThrows[IllegalArgumentException] {
         conf.set(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS, s)
-        val status = MapStatus(null, sizes, 1)
+        val status = MapStatus(null, sizes)
       }
     }
     // Test positive values
     Seq(1, 100, 499, 500, 501).foreach { s =>
       conf.set(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS, s)
-      val status = MapStatus(null, sizes, 1)
+      val status = MapStatus(null, sizes)
       if(sizes.length > s) {
         assert(status.isInstanceOf[HighlyCompressedMapStatus])
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 3691244..ac25bce 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -345,8 +345,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
     val denseBlockSizes = new Array[Long](5000)
     val sparseBlockSizes = Array[Long](0L, 1L, 0L, 2L)
     Seq(denseBlockSizes, sparseBlockSizes).foreach { blockSizes =>
-      ser.serialize(
-        HighlyCompressedMapStatus(BlockManagerId("exec-1", "host", 1234), blockSizes, 1))
+      ser.serialize(HighlyCompressedMapStatus(BlockManagerId("exec-1", "host", 1234), blockSizes))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index cd28c73..cc1a5e8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.catalyst.plans.physical
 
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.types.{DataType, IntegerType}
 
@@ -209,18 +207,6 @@ case object SinglePartition extends Partitioning {
 }
 
 /**
- * Represents a partitioning where rows are only serialized/deserialized locally. The number
- * of partitions are not changed and also the distribution of rows. This is mainly used to
- * obtain some statistics of map tasks such as number of outputs.
- */
-case class LocalPartitioning(childRDD: RDD[InternalRow]) extends Partitioning {
-  val numPartitions = childRDD.getNumPartitions
-
-  // We will perform this partitioning no matter what the data distribution is.
-  override def satisfies0(required: Distribution): Boolean = false
-}
-
-/**
  * Represents a partitioning where rows are split up across partitions based on the hash
  * of `expressions`.  All rows where `expressions` evaluate to the same values are guaranteed to be
  * in the same partition.

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a01e87c..da49219 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -255,13 +255,6 @@ object SQLConf {
     .intConf
     .createWithDefault(4)
 
-  val LIMIT_FLAT_GLOBAL_LIMIT = buildConf("spark.sql.limit.flatGlobalLimit")
-    .internal()
-    .doc("During global limit, try to evenly distribute limited rows across data " +
-      "partitions. If disabled, scanning data partitions sequentially until reaching limit number.")
-    .booleanConf
-    .createWithDefault(true)
-
   val ADVANCED_PARTITION_PREDICATE_PUSHDOWN =
     buildConf("spark.sql.hive.advancedPartitionPredicatePushdown.enabled")
       .internal()
@@ -1771,8 +1764,6 @@ class SQLConf extends Serializable with Logging {
 
   def limitScaleUpFactor: Int = getConf(LIMIT_SCALE_UP_FACTOR)
 
-  def limitFlatGlobalLimit: Boolean = getConf(LIMIT_FLAT_GLOBAL_LIMIT)
-
   def advancedPartitionPredicatePushdownEnabled: Boolean =
     getConf(ADVANCED_PARTITION_PREDICATE_PUSHDOWN)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 89442a7..dbc6db6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -66,35 +66,24 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
    * Plans special cases of limit operators.
    */
   object SpecialLimits extends Strategy {
-    private def decideTopRankNode(limit: Int, child: LogicalPlan): Seq[SparkPlan] = {
-      if (limit < conf.topKSortFallbackThreshold) {
-        child match {
-          case Sort(order, true, child) =>
-            TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
-          case Project(projectList, Sort(order, true, child)) =>
-            TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil
-        }
-      } else {
-        GlobalLimitExec(limit,
-          LocalLimitExec(limit, planLater(child)),
-          orderedLimit = true) :: Nil
-      }
-    }
-
     override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case ReturnAnswer(rootPlan) => rootPlan match {
-        case Limit(IntegerLiteral(limit), s @ Sort(order, true, child)) =>
-          decideTopRankNode(limit, s)
-        case Limit(IntegerLiteral(limit), p @ Project(projectList, Sort(order, true, child))) =>
-          decideTopRankNode(limit, p)
+        case Limit(IntegerLiteral(limit), Sort(order, true, child))
+            if limit < conf.topKSortFallbackThreshold =>
+          TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
+        case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child)))
+            if limit < conf.topKSortFallbackThreshold =>
+          TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil
         case Limit(IntegerLiteral(limit), child) =>
           CollectLimitExec(limit, planLater(child)) :: Nil
         case other => planLater(other) :: Nil
       }
-      case Limit(IntegerLiteral(limit), s @ Sort(order, true, child)) =>
-        decideTopRankNode(limit, s)
-      case Limit(IntegerLiteral(limit), p @ Project(projectList, Sort(order, true, child))) =>
-        decideTopRankNode(limit, p)
+      case Limit(IntegerLiteral(limit), Sort(order, true, child))
+          if limit < conf.topKSortFallbackThreshold =>
+        TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
+      case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child)))
+          if limit < conf.topKSortFallbackThreshold =>
+        TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index 9576605..aba9488 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -231,11 +231,6 @@ object ShuffleExchangeExec {
           override def numPartitions: Int = 1
           override def getPartition(key: Any): Int = 0
         }
-      case l: LocalPartitioning =>
-        new Partitioner {
-          override def numPartitions: Int = l.numPartitions
-          override def getPartition(key: Any): Int = key.asInstanceOf[Int]
-        }
       case _ => sys.error(s"Exchange not implemented for $newPartitioning")
       // TODO: Handle BroadcastPartitioning.
     }
@@ -252,9 +247,6 @@ object ShuffleExchangeExec {
         val projection = UnsafeProjection.create(h.partitionIdExpression :: Nil, outputAttributes)
         row => projection(row).getInt(0)
       case RangePartitioning(_, _) | SinglePartition => identity
-      case _: LocalPartitioning =>
-        val partitionId = TaskContext.get().partitionId()
-        _ => partitionId
       case _ => sys.error(s"Exchange not implemented for $newPartitioning")
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 1a09632..66bcda8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -47,16 +47,13 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
+ * Helper trait which defines methods that are shared by both
+ * [[LocalLimitExec]] and [[GlobalLimitExec]].
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode with CodegenSupport {
-
+trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
+  val limit: Int
   override def output: Seq[Attribute] = child.output
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
   protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
     iter.take(limit)
   }
@@ -96,96 +93,25 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode wi
 }
 
 /**
- * Take the `limit` elements of the child output.
+ * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
  */
-case class GlobalLimitExec(limit: Int, child: SparkPlan,
-                           orderedLimit: Boolean = false) extends UnaryExecNode {
+case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
 
-  override def output: Seq[Attribute] = child.output
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+/**
+ * Take the first `limit` elements of the child's single output partition.
+ */
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
 
-  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
+  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
 
-  protected override def doExecute(): RDD[InternalRow] = {
-    val childRDD = child.execute()
-    val partitioner = LocalPartitioning(childRDD)
-    val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
-      childRDD, child.output, partitioner, serializer)
-    val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) {
-      // submitMapStage does not accept RDD with 0 partition.
-      // So, we will not submit this dependency.
-      val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency)
-      submittedStageFuture.get().recordsByPartitionId.toSeq
-    } else {
-      Nil
-    }
+  override def outputPartitioning: Partitioning = child.outputPartitioning
 
-    // This is an optimization to evenly distribute limited rows across all partitions.
-    // When enabled, Spark goes to take rows at each partition repeatedly until reaching
-    // limit number. When disabled, Spark takes all rows at first partition, then rows
-    // at second partition ..., until reaching limit number.
-    // The optimization is disabled when it is needed to keep the original order of rows
-    // before global sort, e.g., select * from table order by col limit 10.
-    val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && !orderedLimit
-
-    val shuffled = new ShuffledRowRDD(shuffleDependency)
-
-    val sumOfOutput = numberOfOutput.sum
-    if (sumOfOutput <= limit) {
-      shuffled
-    } else if (!flatGlobalLimit) {
-      var numRowTaken = 0
-      val takeAmounts = numberOfOutput.map { num =>
-        if (numRowTaken + num < limit) {
-          numRowTaken += num.toInt
-          num.toInt
-        } else {
-          val toTake = limit - numRowTaken
-          numRowTaken += toTake
-          toTake
-        }
-      }
-      val broadMap = sparkContext.broadcast(takeAmounts)
-      shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
-        iter.take(broadMap.value(index).toInt)
-      }
-    } else {
-      // We try to evenly require the asked limit number of rows across all child rdd's partitions.
-      var rowsNeedToTake: Long = limit
-      val takeAmountByPartition: Array[Long] = Array.fill[Long](numberOfOutput.length)(0L)
-      val remainingRowsByPartition: Array[Long] = Array(numberOfOutput: _*)
-
-      while (rowsNeedToTake > 0) {
-        val nonEmptyParts = remainingRowsByPartition.count(_ > 0)
-        // If the rows needed to take are less the number of non-empty partitions, take one row from
-        // each non-empty partitions until we reach `limit` rows.
-        // Otherwise, evenly divide the needed rows to each non-empty partitions.
-        val takePerPart = math.max(1, rowsNeedToTake / nonEmptyParts)
-        remainingRowsByPartition.zipWithIndex.foreach { case (num, index) =>
-          // In case `rowsNeedToTake` < `nonEmptyParts`, we may run out of `rowsNeedToTake` during
-          // the traversal, so we need to add this check.
-          if (rowsNeedToTake > 0 && num > 0) {
-            if (num >= takePerPart) {
-              rowsNeedToTake -= takePerPart
-              takeAmountByPartition(index) += takePerPart
-              remainingRowsByPartition(index) -= takePerPart
-            } else {
-              rowsNeedToTake -= num
-              takeAmountByPartition(index) += num
-              remainingRowsByPartition(index) -= num
-            }
-          }
-        }
-      }
-      val broadMap = sparkContext.broadcast(takeAmountByPartition)
-      shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
-        iter.take(broadMap.value(index).toInt)
-      }
-    }
-  }
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/sql/core/src/test/resources/sql-tests/inputs/limit.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/limit.sql b/sql/core/src/test/resources/sql-tests/inputs/limit.sql
index e33cd81..b4c73cf 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/limit.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/limit.sql
@@ -1,5 +1,3 @@
--- Disable global limit parallel
-set spark.sql.limit.flatGlobalLimit=false;
 
 -- limit on various data types
 SELECT * FROM testdata LIMIT 2;

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
index a862e09..a40ee08 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
@@ -1,9 +1,6 @@
 -- A test suite for IN LIMIT in parent side, subquery, and both predicate subquery
 -- It includes correlated cases.
 
--- Disable global limit optimization
-set spark.sql.limit.flatGlobalLimit=false;
-
 create temporary view t1 as select * from values
   ("val1a", 6S, 8, 10L, float(15.0), 20D, 20E2, timestamp '2014-04-04 01:00:00.000', date '2014-04-04'),
   ("val1b", 8S, 16, 19L, float(17.0), 25D, 26E2, timestamp '2014-05-04 01:01:00.000', date '2014-05-04'),
@@ -100,4 +97,4 @@ WHERE  t1d NOT IN (SELECT t2d
                    LIMIT 1)
 GROUP  BY t1b
 ORDER BY t1b NULLS last
-LIMIT  1;
+LIMIT  1;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/sql/core/src/test/resources/sql-tests/results/limit.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/limit.sql.out b/sql/core/src/test/resources/sql-tests/results/limit.sql.out
index 187f3bd..02fe1de 100644
--- a/sql/core/src/test/resources/sql-tests/results/limit.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/limit.sql.out
@@ -1,134 +1,126 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 15
+-- Number of queries: 14
 
 
 -- !query 0
-set spark.sql.limit.flatGlobalLimit=false
--- !query 0 schema
-struct<key:string,value:string>
--- !query 0 output
-spark.sql.limit.flatGlobalLimit	false
-
-
--- !query 1
 SELECT * FROM testdata LIMIT 2
--- !query 1 schema
+-- !query 0 schema
 struct<key:int,value:string>
--- !query 1 output
+-- !query 0 output
 1	1
 2	2
 
 
--- !query 2
+-- !query 1
 SELECT * FROM arraydata LIMIT 2
--- !query 2 schema
+-- !query 1 schema
 struct<arraycol:array<int>,nestedarraycol:array<array<int>>>
--- !query 2 output
+-- !query 1 output
 [1,2,3]	[[1,2,3]]
 [2,3,4]	[[2,3,4]]
 
 
--- !query 3
+-- !query 2
 SELECT * FROM mapdata LIMIT 2
--- !query 3 schema
+-- !query 2 schema
 struct<mapcol:map<int,string>>
--- !query 3 output
+-- !query 2 output
 {1:"a1",2:"b1",3:"c1",4:"d1",5:"e1"}
 {1:"a2",2:"b2",3:"c2",4:"d2"}
 
 
--- !query 4
+-- !query 3
 SELECT * FROM testdata LIMIT 2 + 1
--- !query 4 schema
+-- !query 3 schema
 struct<key:int,value:string>
--- !query 4 output
+-- !query 3 output
 1	1
 2	2
 3	3
 
 
--- !query 5
+-- !query 4
 SELECT * FROM testdata LIMIT CAST(1 AS int)
--- !query 5 schema
+-- !query 4 schema
 struct<key:int,value:string>
--- !query 5 output
+-- !query 4 output
 1	1
 
 
--- !query 6
+-- !query 5
 SELECT * FROM testdata LIMIT -1
--- !query 6 schema
+-- !query 5 schema
 struct<>
--- !query 6 output
+-- !query 5 output
 org.apache.spark.sql.AnalysisException
 The limit expression must be equal to or greater than 0, but got -1;
 
 
--- !query 7
+-- !query 6
 SELECT * FROM testData TABLESAMPLE (-1 ROWS)
--- !query 7 schema
+-- !query 6 schema
 struct<>
--- !query 7 output
+-- !query 6 output
 org.apache.spark.sql.AnalysisException
 The limit expression must be equal to or greater than 0, but got -1;
 
 
--- !query 8
+-- !query 7
 SELECT * FROM testdata LIMIT CAST(1 AS INT)
--- !query 8 schema
+-- !query 7 schema
 struct<key:int,value:string>
--- !query 8 output
+-- !query 7 output
 1	1
 
 
--- !query 9
+-- !query 8
 SELECT * FROM testdata LIMIT CAST(NULL AS INT)
--- !query 9 schema
+-- !query 8 schema
 struct<>
--- !query 9 output
+-- !query 8 output
 org.apache.spark.sql.AnalysisException
 The evaluated limit expression must not be null, but got CAST(NULL AS INT);
 
 
--- !query 10
+-- !query 9
 SELECT * FROM testdata LIMIT key > 3
--- !query 10 schema
+-- !query 9 schema
 struct<>
--- !query 10 output
+-- !query 9 output
 org.apache.spark.sql.AnalysisException
 The limit expression must evaluate to a constant value, but got (testdata.`key` > 3);
 
 
--- !query 11
+-- !query 10
 SELECT * FROM testdata LIMIT true
--- !query 11 schema
+-- !query 10 schema
 struct<>
--- !query 11 output
+-- !query 10 output
 org.apache.spark.sql.AnalysisException
 The limit expression must be integer type, but got boolean;
 
 
--- !query 12
+-- !query 11
 SELECT * FROM testdata LIMIT 'a'
--- !query 12 schema
+-- !query 11 schema
 struct<>
--- !query 12 output
+-- !query 11 output
 org.apache.spark.sql.AnalysisException
 The limit expression must be integer type, but got string;
 
 
--- !query 13
+-- !query 12
 SELECT * FROM (SELECT * FROM range(10) LIMIT 5) WHERE id > 3
--- !query 13 schema
+-- !query 12 schema
 struct<id:bigint>
--- !query 13 output
+-- !query 12 output
 4
 
 
--- !query 14
+-- !query 13
 SELECT * FROM testdata WHERE key < 3 LIMIT ALL
--- !query 14 schema
+-- !query 13 schema
 struct<key:int,value:string>
--- !query 14 output
+-- !query 13 output
 1	1
 2	2

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out
index 9eb5b33..71ca1f8 100644
--- a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out
@@ -1,16 +1,8 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 9
+-- Number of queries: 8
 
 
 -- !query 0
-set spark.sql.limit.flatGlobalLimit=false
--- !query 0 schema
-struct<key:string,value:string>
--- !query 0 output
-spark.sql.limit.flatGlobalLimit	false
-
-
--- !query 1
 create temporary view t1 as select * from values
   ("val1a", 6S, 8, 10L, float(15.0), 20D, 20E2, timestamp '2014-04-04 01:00:00.000', date '2014-04-04'),
   ("val1b", 8S, 16, 19L, float(17.0), 25D, 26E2, timestamp '2014-05-04 01:01:00.000', date '2014-05-04'),
@@ -25,13 +17,13 @@ create temporary view t1 as select * from values
   ("val1a", 6S, 8, 10L, float(15.0), 20D, 20E2, timestamp '2014-04-04 01:02:00.001', date '2014-04-04'),
   ("val1e", 10S, null, 19L, float(17.0), 25D, 26E2, timestamp '2014-05-04 01:01:00.000', date '2014-05-04')
   as t1(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i)
--- !query 1 schema
+-- !query 0 schema
 struct<>
--- !query 1 output
+-- !query 0 output
 
 
 
--- !query 2
+-- !query 1
 create temporary view t2 as select * from values
   ("val2a", 6S, 12, 14L, float(15), 20D, 20E2, timestamp '2014-04-04 01:01:00.000', date '2014-04-04'),
   ("val1b", 10S, 12, 19L, float(17), 25D, 26E2, timestamp '2014-05-04 01:01:00.000', date '2014-05-04'),
@@ -47,13 +39,13 @@ create temporary view t2 as select * from values
   ("val1f", 19S, null, 19L, float(17), 25D, 26E2, timestamp '2014-10-04 01:01:00.000', date '2014-10-04'),
   ("val1b", null, 16, 19L, float(17), 25D, 26E2, timestamp '2014-05-04 01:01:00.000', null)
   as t2(t2a, t2b, t2c, t2d, t2e, t2f, t2g, t2h, t2i)
--- !query 2 schema
+-- !query 1 schema
 struct<>
--- !query 2 output
+-- !query 1 output
 
 
 
--- !query 3
+-- !query 2
 create temporary view t3 as select * from values
   ("val3a", 6S, 12, 110L, float(15), 20D, 20E2, timestamp '2014-04-04 01:02:00.000', date '2014-04-04'),
   ("val3a", 6S, 12, 10L, float(15), 20D, 20E2, timestamp '2014-05-04 01:02:00.000', date '2014-05-04'),
@@ -68,27 +60,27 @@ create temporary view t3 as select * from values
   ("val3b", 8S, null, 719L, float(17), 25D, 26E2, timestamp '2014-05-04 01:02:00.000', date '2014-05-04'),
   ("val3b", 8S, null, 19L, float(17), 25D, 26E2, timestamp '2015-05-04 01:02:00.000', date '2015-05-04')
   as t3(t3a, t3b, t3c, t3d, t3e, t3f, t3g, t3h, t3i)
--- !query 3 schema
+-- !query 2 schema
 struct<>
--- !query 3 output
+-- !query 2 output
 
 
 
--- !query 4
+-- !query 3
 SELECT *
 FROM   t1
 WHERE  t1a IN (SELECT t2a
                FROM   t2
                WHERE  t1d = t2d)
 LIMIT  2
--- !query 4 schema
+-- !query 3 schema
 struct<t1a:string,t1b:smallint,t1c:int,t1d:bigint,t1e:float,t1f:double,t1g:decimal(2,-2),t1h:timestamp,t1i:date>
--- !query 4 output
+-- !query 3 output
 val1b	8	16	19	17.0	25.0	2600	2014-05-04 01:01:00	2014-05-04
 val1c	8	16	19	17.0	25.0	2600	2014-05-04 01:02:00.001	2014-05-05
 
 
--- !query 5
+-- !query 4
 SELECT *
 FROM   t1
 WHERE  t1c IN (SELECT t2c
@@ -96,16 +88,16 @@ WHERE  t1c IN (SELECT t2c
                WHERE  t2b >= 8
                LIMIT  2)
 LIMIT 4
--- !query 5 schema
+-- !query 4 schema
 struct<t1a:string,t1b:smallint,t1c:int,t1d:bigint,t1e:float,t1f:double,t1g:decimal(2,-2),t1h:timestamp,t1i:date>
--- !query 5 output
+-- !query 4 output
 val1a	16	12	10	15.0	20.0	2000	2014-07-04 01:01:00	2014-07-04
 val1a	16	12	21	15.0	20.0	2000	2014-06-04 01:02:00.001	2014-06-04
 val1b	8	16	19	17.0	25.0	2600	2014-05-04 01:01:00	2014-05-04
 val1c	8	16	19	17.0	25.0	2600	2014-05-04 01:02:00.001	2014-05-05
 
 
--- !query 6
+-- !query 5
 SELECT Count(DISTINCT( t1a )),
        t1b
 FROM   t1
@@ -116,29 +108,29 @@ WHERE  t1d IN (SELECT t2d
 GROUP  BY t1b
 ORDER  BY t1b DESC NULLS FIRST
 LIMIT  1
--- !query 6 schema
+-- !query 5 schema
 struct<count(DISTINCT t1a):bigint,t1b:smallint>
--- !query 6 output
+-- !query 5 output
 1	NULL
 
 
--- !query 7
+-- !query 6
 SELECT *
 FROM   t1
 WHERE  t1b NOT IN (SELECT t2b
                    FROM   t2
                    WHERE  t2b > 6
                    LIMIT  2)
--- !query 7 schema
+-- !query 6 schema
 struct<t1a:string,t1b:smallint,t1c:int,t1d:bigint,t1e:float,t1f:double,t1g:decimal(2,-2),t1h:timestamp,t1i:date>
--- !query 7 output
+-- !query 6 output
 val1a	16	12	10	15.0	20.0	2000	2014-07-04 01:01:00	2014-07-04
 val1a	16	12	21	15.0	20.0	2000	2014-06-04 01:02:00.001	2014-06-04
 val1a	6	8	10	15.0	20.0	2000	2014-04-04 01:00:00	2014-04-04
 val1a	6	8	10	15.0	20.0	2000	2014-04-04 01:02:00.001	2014-04-04
 
 
--- !query 8
+-- !query 7
 SELECT Count(DISTINCT( t1a )),
        t1b
 FROM   t1
@@ -149,7 +141,7 @@ WHERE  t1d NOT IN (SELECT t2d
 GROUP  BY t1b
 ORDER BY t1b NULLS last
 LIMIT  1
--- !query 8 schema
+-- !query 7 schema
 struct<count(DISTINCT t1a):bigint,t1b:smallint>
--- !query 8 output
+-- !query 7 output
 1	6

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index ed110f7..d0106c4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -557,13 +557,11 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
   }
 
   test("SPARK-18004 limit + aggregates") {
-    withSQLConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT.key -> "true") {
-      val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value").repartition(1)
-      val limit2Df = df.limit(2)
-      checkAnswer(
-        limit2Df.groupBy("id").count().select($"id"),
-        limit2Df.select($"id"))
-    }
+    val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value")
+    val limit2Df = df.limit(2)
+    checkAnswer(
+      limit2Df.groupBy("id").count().select($"id"),
+      limit2Df.select($"id"))
   }
 
   test("SPARK-17237 remove backticks in a pivot result schema") {

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index f001b13..279b7b8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.Uuid
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, OneRowRelation, Union}
-import org.apache.spark.sql.execution.{FilterExec, QueryExecution, TakeOrderedAndProjectExec, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.{FilterExec, QueryExecution, WholeStageCodegenExec}
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
 import org.apache.spark.sql.functions._
@@ -2552,26 +2552,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
     }
   }
 
-  test("SPARK-25352: Ordered global limit when more than topKSortFallbackThreshold ") {
-    withSQLConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT.key -> "true") {
-      val baseDf = spark.range(1000).toDF.repartition(3).sort("id")
-
-      withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "100") {
-        val expected = baseDf.limit(99)
-        val takeOrderedNode1 = expected.queryExecution.executedPlan
-          .find(_.isInstanceOf[TakeOrderedAndProjectExec])
-        assert(takeOrderedNode1.isDefined)
-
-        val result = baseDf.limit(100)
-        val takeOrderedNode2 = result.queryExecution.executedPlan
-          .find(_.isInstanceOf[TakeOrderedAndProjectExec])
-        assert(takeOrderedNode2.isEmpty)
-
-        checkAnswer(expected, result.collect().take(99))
-      }
-    }
-  }
-
   test("SPARK-25368 Incorrect predicate pushdown returns wrong result") {
     def check(newCol: Column, filter: Column, result: Seq[Row]): Unit = {
       val df1 = spark.createDataFrame(Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 01dc28d..8fcebb3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -524,15 +524,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
     sortTest()
   }
 
-  test("limit for skew dataframe") {
-    // Create a skew dataframe.
-    val df = testData.repartition(100).union(testData).limit(50)
-    // Because `rdd` of dataframe will add a `DeserializeToObject` on top of `GlobalLimit`,
-    // the `GlobalLimit` will not be replaced with `CollectLimit`. So we can test if `GlobalLimit`
-    // work on skew partitions.
-    assert(df.rdd.count() == 50L)
-  }
-
   test("CTE feature") {
     checkAnswer(
       sql("with q1 as (select * from testData limit 10) select * from q1"),
@@ -1944,7 +1935,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
     // TODO: support subexpression elimination in whole stage codegen
     withSQLConf("spark.sql.codegen.wholeStage" -> "false") {
       // select from a table to prevent constant folding.
-      val df = sql("SELECT a, b from testData2 order by a, b limit 1")
+      val df = sql("SELECT a, b from testData2 limit 1")
       checkAnswer(df, Row(1, 1))
 
       checkAnswer(df.selectExpr("a + 1", "a + 1"), Row(2, 2))

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
index c627c51..6ad025f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
@@ -55,7 +55,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
       expectedPartitionStartIndices: Array[Int]): Unit = {
     val mapOutputStatistics = bytesByPartitionIdArray.zipWithIndex.map {
       case (bytesByPartitionId, index) =>
-        new MapOutputStatistics(index, bytesByPartitionId, Array[Long](1))
+        new MapOutputStatistics(index, bytesByPartitionId)
     }
     val estimatedPartitionStartIndices =
       coordinator.estimatePartitionStartIndices(mapOutputStatistics)
@@ -119,8 +119,8 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
       val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0, 0)
       val mapOutputStatistics =
         Array(
-          new MapOutputStatistics(0, bytesByPartitionId1, Array[Long](0)),
-          new MapOutputStatistics(1, bytesByPartitionId2, Array[Long](0)))
+          new MapOutputStatistics(0, bytesByPartitionId1),
+          new MapOutputStatistics(1, bytesByPartitionId2))
       intercept[AssertionError](coordinator.estimatePartitionStartIndices(mapOutputStatistics))
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/sql/core/src/test/scala/org/apache/spark/sql/execution/LimitSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/LimitSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/LimitSuite.scala
deleted file mode 100644
index a7840a5..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/LimitSuite.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import scala.util.Random
-
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SharedSQLContext
-
-
-class LimitSuite extends SparkPlanTest with SharedSQLContext {
-
-  private var rand: Random = _
-  private var seed: Long = 0
-
-  protected override def beforeAll(): Unit = {
-    super.beforeAll()
-    seed = System.currentTimeMillis()
-    rand = new Random(seed)
-  }
-
-  test("Produce ordered global limit if more than topKSortFallbackThreshold") {
-    withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "100") {
-      val df = LimitTest.generateRandomInputData(spark, rand).sort("a")
-
-      val globalLimit = df.limit(99).queryExecution.executedPlan.collect {
-        case g: GlobalLimitExec => g
-      }
-      assert(globalLimit.size == 0)
-
-      val topKSort = df.limit(99).queryExecution.executedPlan.collect {
-        case t: TakeOrderedAndProjectExec => t
-      }
-      assert(topKSort.size == 1)
-
-      val orderedGlobalLimit = df.limit(100).queryExecution.executedPlan.collect {
-        case g: GlobalLimitExec => g
-      }
-      assert(orderedGlobalLimit.size == 1 && orderedGlobalLimit(0).orderedLimit == true)
-    }
-  }
-
-  test("Ordered global limit") {
-    val baseDf = LimitTest.generateRandomInputData(spark, rand)
-      .select("a").repartition(3).sort("a")
-
-    withSQLConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT.key -> "true") {
-      val orderedGlobalLimit = GlobalLimitExec(3, baseDf.queryExecution.sparkPlan,
-        orderedLimit = true)
-      val orderedGlobalLimitResult = SparkPlanTest.executePlan(orderedGlobalLimit, spark.sqlContext)
-        .map(_.getInt(0))
-
-      val globalLimit = GlobalLimitExec(3, baseDf.queryExecution.sparkPlan, orderedLimit = false)
-      val globalLimitResult = SparkPlanTest.executePlan(globalLimit, spark.sqlContext)
-          .map(_.getInt(0))
-
-      // Global limit without order takes values at each partition sequentially.
-      // After global sort, the values in second partition must be larger than the values
-      // in first partition.
-      assert(orderedGlobalLimitResult(0) == globalLimitResult(0))
-      assert(orderedGlobalLimitResult(1) < globalLimitResult(1))
-      assert(orderedGlobalLimitResult(2) < globalLimitResult(2))
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index b10da6c..e4e224d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -262,7 +262,7 @@ class PlannerSuite extends SharedSQLContext {
           ).queryExecution.executedPlan.collect {
             case exchange: ShuffleExchangeExec => exchange
           }.length
-          assert(numExchanges === 3)
+          assert(numExchanges === 5)
         }
 
         {
@@ -277,7 +277,7 @@ class PlannerSuite extends SharedSQLContext {
           ).queryExecution.executedPlan.collect {
             case exchange: ShuffleExchangeExec => exchange
           }.length
-          assert(numExchanges === 3)
+          assert(numExchanges === 5)
         }
 
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
index 9322204..7e317a4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
@@ -19,10 +19,9 @@ package org.apache.spark.sql.execution
 
 import scala.util.Random
 
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 
@@ -38,6 +37,14 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with SharedSQLContext {
     rand = new Random(seed)
   }
 
+  private def generateRandomInputData(): DataFrame = {
+    val schema = new StructType()
+      .add("a", IntegerType, nullable = false)
+      .add("b", IntegerType, nullable = false)
+    val inputData = Seq.fill(10000)(Row(rand.nextInt(), rand.nextInt()))
+    spark.createDataFrame(sparkContext.parallelize(Random.shuffle(inputData), 10), schema)
+  }
+
   /**
    * Adds a no-op filter to the child plan in order to prevent executeCollect() from being
    * called directly on the child plan.
@@ -48,62 +55,32 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with SharedSQLContext {
   val sortOrder = 'a.desc :: 'b.desc :: Nil
 
   test("TakeOrderedAndProject.doExecute without project") {
-    withSQLConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT.key -> "false") {
-      withClue(s"seed = $seed") {
-        checkThatPlansAgree(
-          LimitTest.generateRandomInputData(spark, rand),
-          input =>
-            noOpFilter(TakeOrderedAndProjectExec(limit, sortOrder, input.output, input)),
-          input =>
-            GlobalLimitExec(limit,
-              LocalLimitExec(limit,
-                SortExec(sortOrder, true, input))),
-          sortAnswers = false)
-      }
+    withClue(s"seed = $seed") {
+      checkThatPlansAgree(
+        generateRandomInputData(),
+        input =>
+          noOpFilter(TakeOrderedAndProjectExec(limit, sortOrder, input.output, input)),
+        input =>
+          GlobalLimitExec(limit,
+            LocalLimitExec(limit,
+              SortExec(sortOrder, true, input))),
+        sortAnswers = false)
     }
   }
 
   test("TakeOrderedAndProject.doExecute with project") {
-    withSQLConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT.key -> "false") {
-      withClue(s"seed = $seed") {
-        checkThatPlansAgree(
-          LimitTest.generateRandomInputData(spark, rand),
-          input =>
-            noOpFilter(
-              TakeOrderedAndProjectExec(limit, sortOrder, Seq(input.output.last), input)),
-          input =>
-            GlobalLimitExec(limit,
-              LocalLimitExec(limit,
-                ProjectExec(Seq(input.output.last),
-                  SortExec(sortOrder, true, input)))),
-          sortAnswers = false)
-      }
-    }
-  }
-
-  test("TakeOrderedAndProject.doExecute equals to ordered global limit") {
-    withSQLConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT.key -> "true") {
-      withClue(s"seed = $seed") {
-        checkThatPlansAgree(
-          LimitTest.generateRandomInputData(spark, rand),
-          input =>
-            noOpFilter(TakeOrderedAndProjectExec(limit, sortOrder, input.output, input)),
-          input =>
-            GlobalLimitExec(limit,
-              LocalLimitExec(limit,
-                SortExec(sortOrder, true, input)), orderedLimit = true),
-          sortAnswers = false)
-      }
+    withClue(s"seed = $seed") {
+      checkThatPlansAgree(
+        generateRandomInputData(),
+        input =>
+          noOpFilter(
+            TakeOrderedAndProjectExec(limit, sortOrder, Seq(input.output.last), input)),
+        input =>
+          GlobalLimitExec(limit,
+            LocalLimitExec(limit,
+              ProjectExec(Seq(input.output.last),
+                SortExec(sortOrder, true, input)))),
+        sortAnswers = false)
     }
   }
 }
-
-object LimitTest {
-  def generateRandomInputData(spark: SparkSession, rand: Random): DataFrame = {
-    val schema = new StructType()
-      .add("a", IntegerType, nullable = false)
-      .add("b", IntegerType, nullable = false)
-    val inputData = Seq.fill(10000)(Row(rand.nextInt(), rand.nextInt()))
-    spark.createDataFrame(spark.sparkContext.parallelize(Random.shuffle(inputData), 10), schema)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index b9b2b7d..cebaad5 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -40,7 +40,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
   private val originalColumnBatchSize = TestHive.conf.columnBatchSize
   private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning
   private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled
-  private val originalLimitFlatGlobalLimit = TestHive.conf.limitFlatGlobalLimit
   private val originalSessionLocalTimeZone = TestHive.conf.sessionLocalTimeZone
 
   def testCases: Seq[(String, File)] = {
@@ -60,8 +59,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true)
     // Ensures that cross joins are enabled so that we can test them
     TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, true)
-    // Ensure that limit operation returns rows in the same order as Hive
-    TestHive.setConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT, false)
     // Fix session local timezone to America/Los_Angeles for those timezone sensitive tests
     // (timestamp_*)
     TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, "America/Los_Angeles")
@@ -76,7 +73,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
       TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
       TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
       TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, originalCrossJoinEnabled)
-      TestHive.setConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT, originalLimitFlatGlobalLimit)
       TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, originalSessionLocalTimeZone)
 
       // For debugging dump some statistics about how much time was spent in various optimizer rules

http://git-wip-us.apache.org/repos/asf/spark/blob/89671a27/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
index 1654129..cc592cf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
@@ -22,29 +22,21 @@ import scala.collection.JavaConverters._
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.sql.hive.test.{TestHive, TestHiveQueryExecution}
-import org.apache.spark.sql.internal.SQLConf
 
 /**
  * A set of test cases that validate partition and column pruning.
  */
 class PruningSuite extends HiveComparisonTest with BeforeAndAfter {
 
-  private val originalLimitFlatGlobalLimit = TestHive.conf.limitFlatGlobalLimit
-
   override def beforeAll(): Unit = {
     super.beforeAll()
     TestHive.setCacheTables(false)
-    TestHive.setConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT, false)
     // Column/partition pruning is not implemented for `InMemoryColumnarTableScan` yet,
     // need to reset the environment to ensure all referenced tables in this suites are
     // not cached in-memory. Refer to https://issues.apache.org/jira/browse/SPARK-2283
     // for details.
     TestHive.reset()
   }
-   override def afterAll() {
-    TestHive.setConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT, originalLimitFlatGlobalLimit)
-    super.afterAll()
-  }
 
   // Column pruning tests
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org