You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/05/23 18:36:59 UTC
[GitHub] [spark] huonw opened a new pull request #24414: [SPARK-22044][SQL]
Add `cost` and `codegen` arguments to `explain`
huonw opened a new pull request #24414: [SPARK-22044][SQL] Add `cost` and `codegen` arguments to `explain`
URL: https://github.com/apache/spark/pull/24414
## What changes were proposed in this pull request?
In SQL it's easy to see the inferred statistics (`EXPLAIN COST`) and
the generated code (`EXPLAIN CODEGEN`), but it was much more annoying
to do so via the Dataset/DataFrame APIs. It was more annoying to
access this information from PySpark, and yet even more annoying from
SparkR, as the work-around for each required dropping down to call JVM
functions directly.
This patch exposes this via an overload of `explain` that takes 3
boolean arguments (extended, cost and codegen). This doesn't replace
the old `explain` overloads (to keep backwards compatibility), and
uses booleans to be easily compatible with PySpark and SparkR
callers. The latter have their `explain` functions extended to include
these extra arguments too.
cost example output:
```
scala> spark.range(1000000).select('id + 1).explain(cost=true)
== Optimized Logical Plan ==
Project [(id#5L + 1) AS (id + 1)#7L], Statistics(sizeInBytes=7.6 MiB)
+- Range (0, 1000000, step=1, splits=Some(1)), Statistics(sizeInBytes=7.6 MiB)
== Physical Plan ==
*(1) Project [(id#5L + 1) AS (id + 1)#7L]
+- *(1) Range (0, 1000000, step=1, splits=1)
```
<details><summary>codegen example output (click for details)</summary>
```
scala> spark.range(1000000).select('id + 1).explain(codegen=true)
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*(1) Project [(id#10L + 1) AS (id + 1)#12L]
+- *(1) Range (0, 1000000, step=1, splits=1)
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private boolean range_initRange_0;
/* 010 */ private long range_nextIndex_0;
/* 011 */ private TaskContext range_taskContext_0;
/* 012 */ private InputMetrics range_inputMetrics_0;
/* 013 */ private long range_batchEnd_0;
/* 014 */ private long range_numElementsTodo_0;
/* 015 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
/* 016 */
/* 017 */ public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 018 */ this.references = references;
/* 019 */ }
/* 020 */
/* 021 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 022 */ partitionIndex = index;
/* 023 */ this.inputs = inputs;
/* 024 */
/* 025 */ range_taskContext_0 = TaskContext.get();
/* 026 */ range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics();
/* 027 */ range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 028 */ range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 029 */
/* 030 */ }
/* 031 */
/* 032 */ private void initRange(int idx) {
/* 033 */ java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
/* 034 */ java.math.BigInteger numSlice = java.math.BigInteger.valueOf(1L);
/* 035 */ java.math.BigInteger numElement = java.math.BigInteger.valueOf(1000000L);
/* 036 */ java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
/* 037 */ java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
/* 038 */ long partitionEnd;
/* 039 */
/* 040 */ java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
/* 041 */ if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 042 */ range_nextIndex_0 = Long.MAX_VALUE;
/* 043 */ } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 044 */ range_nextIndex_0 = Long.MIN_VALUE;
/* 045 */ } else {
/* 046 */ range_nextIndex_0 = st.longValue();
/* 047 */ }
/* 048 */ range_batchEnd_0 = range_nextIndex_0;
/* 049 */
/* 050 */ java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
/* 051 */ .multiply(step).add(start);
/* 052 */ if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 053 */ partitionEnd = Long.MAX_VALUE;
/* 054 */ } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 055 */ partitionEnd = Long.MIN_VALUE;
/* 056 */ } else {
/* 057 */ partitionEnd = end.longValue();
/* 058 */ }
/* 059 */
/* 060 */ java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
/* 061 */ java.math.BigInteger.valueOf(range_nextIndex_0));
/* 062 */ range_numElementsTodo_0 = startToEnd.divide(step).longValue();
/* 063 */ if (range_numElementsTodo_0 < 0) {
/* 064 */ range_numElementsTodo_0 = 0;
/* 065 */ } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
/* 066 */ range_numElementsTodo_0++;
/* 067 */ }
/* 068 */ }
/* 069 */
/* 070 */ protected void processNext() throws java.io.IOException {
/* 071 */ // initialize Range
/* 072 */ if (!range_initRange_0) {
/* 073 */ range_initRange_0 = true;
/* 074 */ initRange(partitionIndex);
/* 075 */ }
/* 076 */
/* 077 */ while (true) {
/* 078 */ if (range_nextIndex_0 == range_batchEnd_0) {
/* 079 */ long range_nextBatchTodo_0;
/* 080 */ if (range_numElementsTodo_0 > 1000L) {
/* 081 */ range_nextBatchTodo_0 = 1000L;
/* 082 */ range_numElementsTodo_0 -= 1000L;
/* 083 */ } else {
/* 084 */ range_nextBatchTodo_0 = range_numElementsTodo_0;
/* 085 */ range_numElementsTodo_0 = 0;
/* 086 */ if (range_nextBatchTodo_0 == 0) break;
/* 087 */ }
/* 088 */ range_batchEnd_0 += range_nextBatchTodo_0 * 1L;
/* 089 */ }
/* 090 */
/* 091 */ int range_localEnd_0 = (int)((range_batchEnd_0 - range_nextIndex_0) / 1L);
/* 092 */ for (int range_localIdx_0 = 0; range_localIdx_0 < range_localEnd_0; range_localIdx_0++) {
/* 093 */ long range_value_0 = ((long)range_localIdx_0 * 1L) + range_nextIndex_0;
/* 094 */
/* 095 */ long project_value_0 = -1L;
/* 096 */ project_value_0 = range_value_0 + 1L;
/* 097 */ range_mutableStateArray_0[1].reset();
/* 098 */
/* 099 */ range_mutableStateArray_0[1].write(0, project_value_0);
/* 100 */ append((range_mutableStateArray_0[1].getRow()));
/* 101 */
/* 102 */ if (shouldStop()) {
/* 103 */ range_nextIndex_0 = range_value_0 + 1L;
/* 104 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localIdx_0 + 1);
/* 105 */ range_inputMetrics_0.incRecordsRead(range_localIdx_0 + 1);
/* 106 */ return;
/* 107 */ }
/* 108 */
/* 109 */ }
/* 110 */ range_nextIndex_0 = range_batchEnd_0;
/* 111 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localEnd_0);
/* 112 */ range_inputMetrics_0.incRecordsRead(range_localEnd_0);
/* 113 */ range_taskContext_0.killTaskIfInterrupted();
/* 114 */ }
/* 115 */ }
/* 116 */
/* 117 */ }
```
</details>
## How was this patch tested?
Added unit tests.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org