You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2017/03/10 17:04:43 UTC

spark git commit: [SPARK-19786][SQL] Facilitate loop optimizations in a JIT compiler regarding range()

Repository: spark
Updated Branches:
  refs/heads/master 501b71119 -> fcb68e0f5


[SPARK-19786][SQL] Facilitate loop optimizations in a JIT compiler regarding range()

## What changes were proposed in this pull request?

This PR improves performance of operations with `range()` by changing Java code generated by Catalyst. This PR is inspired by the [blog article](https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html).

This PR changes generated code in the following two points.
1. Replace a while-loop with long instance variables a for-loop with int local varibles
2. Suppress generation of `shouldStop()` method if this method is unnecessary (e.g. `append()` is not generated).

These points facilitates compiler optimizations in a JIT compiler by feeding the simplified Java code into the JIT compiler. The performance is improved by 7.6x.

Benchmark program:
```java
val N = 1 << 29
val iters = 2
val benchmark = new Benchmark("range.count", N * iters)
benchmark.addCase(s"with this PR") { i =>
  var n = 0
  var len = 0
  while (n < iters) {
    len += sparkSession.range(N).selectExpr("count(id)").collect.length
    n += 1
  }
}
benchmark.run
```

Performance result without this PR
```
OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
range.count:                             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
w/o this PR                                   1349 / 1356        796.2           1.3       1.0X
```

Performance result with this PR
```
OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
range.count:                             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
with this PR                                   177 /  271       6065.3           0.2       1.0X
```

Here is a comparison between generated code w/o and with this PR. Only the method ```agg_doAggregateWithoutKey``` is changed.

Generated code without this PR
```java

/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private boolean agg_initAgg;
/* 009 */   private boolean agg_bufIsNull;
/* 010 */   private long agg_bufValue;
/* 011 */   private org.apache.spark.sql.execution.metric.SQLMetric range_numOutputRows;
/* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric range_numGeneratedRows;
/* 013 */   private boolean range_initRange;
/* 014 */   private long range_number;
/* 015 */   private TaskContext range_taskContext;
/* 016 */   private InputMetrics range_inputMetrics;
/* 017 */   private long range_batchEnd;
/* 018 */   private long range_numElementsTodo;
/* 019 */   private scala.collection.Iterator range_input;
/* 020 */   private UnsafeRow range_result;
/* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder range_holder;
/* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter range_rowWriter;
/* 023 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows;
/* 024 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime;
/* 025 */   private UnsafeRow agg_result;
/* 026 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 027 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 028 */
/* 029 */   public GeneratedIterator(Object[] references) {
/* 030 */     this.references = references;
/* 031 */   }
/* 032 */
/* 033 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 034 */     partitionIndex = index;
/* 035 */     this.inputs = inputs;
/* 036 */     agg_initAgg = false;
/* 037 */
/* 038 */     this.range_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 039 */     this.range_numGeneratedRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 040 */     range_initRange = false;
/* 041 */     range_number = 0L;
/* 042 */     range_taskContext = TaskContext.get();
/* 043 */     range_inputMetrics = range_taskContext.taskMetrics().inputMetrics();
/* 044 */     range_batchEnd = 0;
/* 045 */     range_numElementsTodo = 0L;
/* 046 */     range_input = inputs[0];
/* 047 */     range_result = new UnsafeRow(1);
/* 048 */     this.range_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(range_result, 0);
/* 049 */     this.range_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_holder, 1);
/* 050 */     this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 051 */     this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 052 */     agg_result = new UnsafeRow(1);
/* 053 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
/* 054 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
/* 055 */
/* 056 */   }
/* 057 */
/* 058 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
/* 059 */     // initialize aggregation buffer
/* 060 */     agg_bufIsNull = false;
/* 061 */     agg_bufValue = 0L;
/* 062 */
/* 063 */     // initialize Range
/* 064 */     if (!range_initRange) {
/* 065 */       range_initRange = true;
/* 066 */       initRange(partitionIndex);
/* 067 */     }
/* 068 */
/* 069 */     while (true) {
/* 070 */       while (range_number != range_batchEnd) {
/* 071 */         long range_value = range_number;
/* 072 */         range_number += 1L;
/* 073 */
/* 074 */         // do aggregate
/* 075 */         // common sub-expressions
/* 076 */
/* 077 */         // evaluate aggregate function
/* 078 */         boolean agg_isNull1 = false;
/* 079 */
/* 080 */         long agg_value1 = -1L;
/* 081 */         agg_value1 = agg_bufValue + 1L;
/* 082 */         // update aggregation buffer
/* 083 */         agg_bufIsNull = false;
/* 084 */         agg_bufValue = agg_value1;
/* 085 */
/* 086 */         if (shouldStop()) return;
/* 087 */       }
/* 088 */
/* 089 */       if (range_taskContext.isInterrupted()) {
/* 090 */         throw new TaskKilledException();
/* 091 */       }
/* 092 */
/* 093 */       long range_nextBatchTodo;
/* 094 */       if (range_numElementsTodo > 1000L) {
/* 095 */         range_nextBatchTodo = 1000L;
/* 096 */         range_numElementsTodo -= 1000L;
/* 097 */       } else {
/* 098 */         range_nextBatchTodo = range_numElementsTodo;
/* 099 */         range_numElementsTodo = 0;
/* 100 */         if (range_nextBatchTodo == 0) break;
/* 101 */       }
/* 102 */       range_numOutputRows.add(range_nextBatchTodo);
/* 103 */       range_inputMetrics.incRecordsRead(range_nextBatchTodo);
/* 104 */
/* 105 */       range_batchEnd += range_nextBatchTodo * 1L;
/* 106 */     }
/* 107 */
/* 108 */   }
/* 109 */
/* 110 */   private void initRange(int idx) {
/* 111 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
/* 112 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(2L);
/* 113 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(10000L);
/* 114 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
/* 115 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
/* 117 */
/* 118 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
/* 119 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 120 */       range_number = Long.MAX_VALUE;
/* 121 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 122 */       range_number = Long.MIN_VALUE;
/* 123 */     } else {
/* 124 */       range_number = st.longValue();
/* 125 */     }
/* 126 */     range_batchEnd = range_number;
/* 127 */
/* 128 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
/* 129 */     .multiply(step).add(start);
/* 130 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 131 */       partitionEnd = Long.MAX_VALUE;
/* 132 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 133 */       partitionEnd = Long.MIN_VALUE;
/* 134 */     } else {
/* 135 */       partitionEnd = end.longValue();
/* 136 */     }
/* 137 */
/* 138 */     java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
/* 139 */       java.math.BigInteger.valueOf(range_number));
/* 140 */     range_numElementsTodo  = startToEnd.divide(step).longValue();
/* 141 */     if (range_numElementsTodo < 0) {
/* 142 */       range_numElementsTodo = 0;
/* 143 */     } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
/* 144 */       range_numElementsTodo++;
/* 145 */     }
/* 146 */   }
/* 147 */
/* 148 */   protected void processNext() throws java.io.IOException {
/* 149 */     while (!agg_initAgg) {
/* 150 */       agg_initAgg = true;
/* 151 */       long agg_beforeAgg = System.nanoTime();
/* 152 */       agg_doAggregateWithoutKey();
/* 153 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000);
/* 154 */
/* 155 */       // output the result
/* 156 */
/* 157 */       agg_numOutputRows.add(1);
/* 158 */       agg_rowWriter.zeroOutNullBytes();
/* 159 */
/* 160 */       if (agg_bufIsNull) {
/* 161 */         agg_rowWriter.setNullAt(0);
/* 162 */       } else {
/* 163 */         agg_rowWriter.write(0, agg_bufValue);
/* 164 */       }
/* 165 */       append(agg_result);
/* 166 */     }
/* 167 */   }
/* 168 */ }
```

Generated code with this PR
```java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private boolean agg_initAgg;
/* 009 */   private boolean agg_bufIsNull;
/* 010 */   private long agg_bufValue;
/* 011 */   private org.apache.spark.sql.execution.metric.SQLMetric range_numOutputRows;
/* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric range_numGeneratedRows;
/* 013 */   private boolean range_initRange;
/* 014 */   private long range_number;
/* 015 */   private TaskContext range_taskContext;
/* 016 */   private InputMetrics range_inputMetrics;
/* 017 */   private long range_batchEnd;
/* 018 */   private long range_numElementsTodo;
/* 019 */   private scala.collection.Iterator range_input;
/* 020 */   private UnsafeRow range_result;
/* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder range_holder;
/* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter range_rowWriter;
/* 023 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows;
/* 024 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime;
/* 025 */   private UnsafeRow agg_result;
/* 026 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 027 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 028 */
/* 029 */   public GeneratedIterator(Object[] references) {
/* 030 */     this.references = references;
/* 031 */   }
/* 032 */
/* 033 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 034 */     partitionIndex = index;
/* 035 */     this.inputs = inputs;
/* 036 */     agg_initAgg = false;
/* 037 */
/* 038 */     this.range_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 039 */     this.range_numGeneratedRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 040 */     range_initRange = false;
/* 041 */     range_number = 0L;
/* 042 */     range_taskContext = TaskContext.get();
/* 043 */     range_inputMetrics = range_taskContext.taskMetrics().inputMetrics();
/* 044 */     range_batchEnd = 0;
/* 045 */     range_numElementsTodo = 0L;
/* 046 */     range_input = inputs[0];
/* 047 */     range_result = new UnsafeRow(1);
/* 048 */     this.range_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(range_result, 0);
/* 049 */     this.range_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_holder, 1);
/* 050 */     this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 051 */     this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 052 */     agg_result = new UnsafeRow(1);
/* 053 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
/* 054 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
/* 055 */
/* 056 */   }
/* 057 */
/* 058 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
/* 059 */     // initialize aggregation buffer
/* 060 */     agg_bufIsNull = false;
/* 061 */     agg_bufValue = 0L;
/* 062 */
/* 063 */     // initialize Range
/* 064 */     if (!range_initRange) {
/* 065 */       range_initRange = true;
/* 066 */       initRange(partitionIndex);
/* 067 */     }
/* 068 */
/* 069 */     while (true) {
/* 070 */       long range_range = range_batchEnd - range_number;
/* 071 */       if (range_range != 0L) {
/* 072 */         int range_localEnd = (int)(range_range / 1L);
/* 073 */         for (int range_localIdx = 0; range_localIdx < range_localEnd; range_localIdx++) {
/* 074 */           long range_value = ((long)range_localIdx * 1L) + range_number;
/* 075 */
/* 076 */           // do aggregate
/* 077 */           // common sub-expressions
/* 078 */
/* 079 */           // evaluate aggregate function
/* 080 */           boolean agg_isNull1 = false;
/* 081 */
/* 082 */           long agg_value1 = -1L;
/* 083 */           agg_value1 = agg_bufValue + 1L;
/* 084 */           // update aggregation buffer
/* 085 */           agg_bufIsNull = false;
/* 086 */           agg_bufValue = agg_value1;
/* 087 */
/* 088 */           // shouldStop check is eliminated
/* 089 */         }
/* 090 */         range_number = range_batchEnd;
/* 091 */       }
/* 092 */
/* 093 */       if (range_taskContext.isInterrupted()) {
/* 094 */         throw new TaskKilledException();
/* 095 */       }
/* 096 */
/* 097 */       long range_nextBatchTodo;
/* 098 */       if (range_numElementsTodo > 1000L) {
/* 099 */         range_nextBatchTodo = 1000L;
/* 100 */         range_numElementsTodo -= 1000L;
/* 101 */       } else {
/* 102 */         range_nextBatchTodo = range_numElementsTodo;
/* 103 */         range_numElementsTodo = 0;
/* 104 */         if (range_nextBatchTodo == 0) break;
/* 105 */       }
/* 106 */       range_numOutputRows.add(range_nextBatchTodo);
/* 107 */       range_inputMetrics.incRecordsRead(range_nextBatchTodo);
/* 108 */
/* 109 */       range_batchEnd += range_nextBatchTodo * 1L;
/* 110 */     }
/* 111 */
/* 112 */   }
/* 113 */
/* 114 */   private void initRange(int idx) {
/* 115 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
/* 116 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(2L);
/* 117 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(10000L);
/* 118 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
/* 119 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
/* 120 */     long partitionEnd;
/* 121 */
/* 122 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
/* 123 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 124 */       range_number = Long.MAX_VALUE;
/* 125 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 126 */       range_number = Long.MIN_VALUE;
/* 127 */     } else {
/* 128 */       range_number = st.longValue();
/* 129 */     }
/* 130 */     range_batchEnd = range_number;
/* 131 */
/* 132 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
/* 133 */     .multiply(step).add(start);
/* 134 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 135 */       partitionEnd = Long.MAX_VALUE;
/* 136 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 137 */       partitionEnd = Long.MIN_VALUE;
/* 138 */     } else {
/* 139 */       partitionEnd = end.longValue();
/* 140 */     }
/* 141 */
/* 142 */     java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
/* 143 */       java.math.BigInteger.valueOf(range_number));
/* 144 */     range_numElementsTodo  = startToEnd.divide(step).longValue();
/* 145 */     if (range_numElementsTodo < 0) {
/* 146 */       range_numElementsTodo = 0;
/* 147 */     } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
/* 148 */       range_numElementsTodo++;
/* 149 */     }
/* 150 */   }
/* 151 */
/* 152 */   protected void processNext() throws java.io.IOException {
/* 153 */     while (!agg_initAgg) {
/* 154 */       agg_initAgg = true;
/* 155 */       long agg_beforeAgg = System.nanoTime();
/* 156 */       agg_doAggregateWithoutKey();
/* 157 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000);
/* 158 */
/* 159 */       // output the result
/* 160 */
/* 161 */       agg_numOutputRows.add(1);
/* 162 */       agg_rowWriter.zeroOutNullBytes();
/* 163 */
/* 164 */       if (agg_bufIsNull) {
/* 165 */         agg_rowWriter.setNullAt(0);
/* 166 */       } else {
/* 167 */         agg_rowWriter.write(0, agg_bufValue);
/* 168 */       }
/* 169 */       append(agg_result);
/* 170 */     }
/* 171 */   }
/* 172 */ }
```

A part of suppressing `shouldStop()` was originally developed by inouehrs

## How was this patch tested?

Add new tests into `DataFrameRangeSuite`

Author: Kazuaki Ishizaki <is...@jp.ibm.com>

Closes #17122 from kiszk/SPARK-19786.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fcb68e0f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fcb68e0f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fcb68e0f

Branch: refs/heads/master
Commit: fcb68e0f5d49234ac4527109887ff08cd4e1c29f
Parents: 501b711
Author: Kazuaki Ishizaki <is...@jp.ibm.com>
Authored: Fri Mar 10 18:04:37 2017 +0100
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Fri Mar 10 18:04:37 2017 +0100

----------------------------------------------------------------------
 .../apache/spark/sql/execution/SortExec.scala   |  2 ++
 .../sql/execution/WholeStageCodegenExec.scala   | 15 +++++++++++
 .../execution/aggregate/HashAggregateExec.scala |  2 ++
 .../sql/execution/basicPhysicalOperators.scala  | 27 +++++++++++++++-----
 .../apache/spark/sql/DataFrameRangeSuite.scala  | 16 ++++++++++++
 5 files changed, 55 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fcb68e0f/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
index cc576bb..f98ae82 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
@@ -177,6 +177,8 @@ case class SortExec(
      """.stripMargin.trim
   }
 
+  protected override val shouldStopRequired = false
+
   override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
     s"""
        |${row.code}

http://git-wip-us.apache.org/repos/asf/spark/blob/fcb68e0f/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index c58474e..c31fd92 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -206,6 +206,21 @@ trait CodegenSupport extends SparkPlan {
   def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
     throw new UnsupportedOperationException
   }
+
+  /**
+   * For optimization to suppress shouldStop() in a loop of WholeStageCodegen.
+   * Returning true means we need to insert shouldStop() into the loop producing rows, if any.
+   */
+  def isShouldStopRequired: Boolean = {
+    return shouldStopRequired && (this.parent == null || this.parent.isShouldStopRequired)
+  }
+
+  /**
+   * Set to false if this plan consumes all rows produced by children but doesn't output row
+   * to buffer by calling append(), so the children don't require shouldStop()
+   * in the loop of producing rows.
+   */
+  protected def shouldStopRequired: Boolean = true
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fcb68e0f/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 4529ed0..68c8e6c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -238,6 +238,8 @@ case class HashAggregateExec(
      """.stripMargin
   }
 
+  protected override val shouldStopRequired = false
+
   private def doConsumeWithoutKeys(ctx: CodegenContext, input: Seq[ExprCode]): String = {
     // only have DeclarativeAggregate
     val functions = aggregateExpressions.map(_.aggregateFunction.asInstanceOf[DeclarativeAggregate])

http://git-wip-us.apache.org/repos/asf/spark/blob/fcb68e0f/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 87e90ed..d876688 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -387,8 +387,8 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
     // How many values should be generated in the next batch.
     val nextBatchTodo = ctx.freshName("nextBatchTodo")
 
-    // The default size of a batch.
-    val batchSize = 1000L
+    // The default size of a batch, which must be positive integer
+    val batchSize = 1000
 
     ctx.addNewFunction("initRange",
       s"""
@@ -434,6 +434,15 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
     val input = ctx.freshName("input")
     // Right now, Range is only used when there is one upstream.
     ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
+
+    val localIdx = ctx.freshName("localIdx")
+    val localEnd = ctx.freshName("localEnd")
+    val range = ctx.freshName("range")
+    val shouldStop = if (isShouldStopRequired) {
+      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
+    } else {
+      "// shouldStop check is eliminated"
+    }
     s"""
       | // initialize Range
       | if (!$initTerm) {
@@ -442,11 +451,15 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
       | }
       |
       | while (true) {
-      |   while ($number != $batchEnd) {
-      |     long $value = $number;
-      |     $number += ${step}L;
-      |     ${consume(ctx, Seq(ev))}
-      |     if (shouldStop()) return;
+      |   long $range = $batchEnd - $number;
+      |   if ($range != 0L) {
+      |     int $localEnd = (int)($range / ${step}L);
+      |     for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
+      |       long $value = ((long)$localIdx * ${step}L) + $number;
+      |       ${consume(ctx, Seq(ev))}
+      |       $shouldStop
+      |     }
+      |     $number = $batchEnd;
       |   }
       |
       |   if ($taskContext.isInterrupted()) {

http://git-wip-us.apache.org/repos/asf/spark/blob/fcb68e0f/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
index acf393a..5e323c0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
@@ -89,6 +89,22 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
     val n = 9L * 1000 * 1000 * 1000 * 1000 * 1000 * 1000
     val res13 = spark.range(-n, n, n / 9).select("id")
     assert(res13.count == 18)
+
+    // range with non aggregation operation
+    val res14 = spark.range(0, 100, 2).toDF.filter("50 <= id")
+    val len14 = res14.collect.length
+    assert(len14 == 25)
+
+    val res15 = spark.range(100, -100, -2).toDF.filter("id <= 0")
+    val len15 = res15.collect.length
+    assert(len15 == 50)
+
+    val res16 = spark.range(-1500, 1500, 3).toDF.filter("0 <= id")
+    val len16 = res16.collect.length
+    assert(len16 == 500)
+
+    val res17 = spark.range(10, 0, -1, 1).toDF.sortWithinPartitions("id")
+    assert(res17.collect === (1 to 10).map(i => Row(i)).toArray)
   }
 
   test("Range with randomized parameters") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org