You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/05/23 18:36:59 UTC

[GitHub] [spark] huonw opened a new pull request #24414: [SPARK-22044][SQL] Add `cost` and `codegen` arguments to `explain`

huonw opened a new pull request #24414: [SPARK-22044][SQL] Add `cost` and `codegen` arguments to `explain`
URL: https://github.com/apache/spark/pull/24414
 
 
   ## What changes were proposed in this pull request?
   
   In SQL it's easy to see the inferred statistics (`EXPLAIN COST`) and
   the generated code (`EXPLAIN CODEGEN`), but it was much more annoying
   to do so via the Dataset/DataFrame APIs. It was more annoying to 
   access this information from PySpark, and yet even more annoying from
   SparkR, as the work-around for each required dropping down to call JVM
   functions directly.
   
   This patch exposes this via an overload of `explain` that takes 3
   boolean arguments (extended, cost and codegen). This doesn't replace
   the old `explain` overloads (to keep backwards compatibility), and
   uses booleans to be easily compatible with PySpark and SparkR
   callers. The latter have their `explain` functions extended to include
   these extra arguments too.
   
   cost example output:
   
   ```
   scala> spark.range(1000000).select('id + 1).explain(cost=true)
   == Optimized Logical Plan ==
   Project [(id#5L + 1) AS (id + 1)#7L], Statistics(sizeInBytes=7.6 MiB)
   +- Range (0, 1000000, step=1, splits=Some(1)), Statistics(sizeInBytes=7.6 MiB)
   
   == Physical Plan ==
   *(1) Project [(id#5L + 1) AS (id + 1)#7L]
   +- *(1) Range (0, 1000000, step=1, splits=1)
   ```
   
   <details><summary>codegen example output (click for details)</summary>
   
   ```
   scala> spark.range(1000000).select('id + 1).explain(codegen=true)
   Found 1 WholeStageCodegen subtrees.
   == Subtree 1 / 1 ==
   *(1) Project [(id#10L + 1) AS (id + 1)#12L]
   +- *(1) Range (0, 1000000, step=1, splits=1)
   
   Generated code:
   /* 001 */ public Object generate(Object[] references) {
   /* 002 */   return new GeneratedIteratorForCodegenStage1(references);
   /* 003 */ }
   /* 004 */
   /* 005 */ // codegenStageId=1
   /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
   /* 007 */   private Object[] references;
   /* 008 */   private scala.collection.Iterator[] inputs;
   /* 009 */   private boolean range_initRange_0;
   /* 010 */   private long range_nextIndex_0;
   /* 011 */   private TaskContext range_taskContext_0;
   /* 012 */   private InputMetrics range_inputMetrics_0;
   /* 013 */   private long range_batchEnd_0;
   /* 014 */   private long range_numElementsTodo_0;
   /* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
   /* 016 */
   /* 017 */   public GeneratedIteratorForCodegenStage1(Object[] references) {
   /* 018 */     this.references = references;
   /* 019 */   }
   /* 020 */
   /* 021 */   public void init(int index, scala.collection.Iterator[] inputs) {
   /* 022 */     partitionIndex = index;
   /* 023 */     this.inputs = inputs;
   /* 024 */
   /* 025 */     range_taskContext_0 = TaskContext.get();
   /* 026 */     range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics();
   /* 027 */     range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
   /* 028 */     range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
   /* 029 */
   /* 030 */   }
   /* 031 */
   /* 032 */   private void initRange(int idx) {
   /* 033 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
   /* 034 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(1L);
   /* 035 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(1000000L);
   /* 036 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
   /* 037 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
   /* 038 */     long partitionEnd;
   /* 039 */
   /* 040 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
   /* 041 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
   /* 042 */       range_nextIndex_0 = Long.MAX_VALUE;
   /* 043 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
   /* 044 */       range_nextIndex_0 = Long.MIN_VALUE;
   /* 045 */     } else {
   /* 046 */       range_nextIndex_0 = st.longValue();
   /* 047 */     }
   /* 048 */     range_batchEnd_0 = range_nextIndex_0;
   /* 049 */
   /* 050 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
   /* 051 */     .multiply(step).add(start);
   /* 052 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
   /* 053 */       partitionEnd = Long.MAX_VALUE;
   /* 054 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
   /* 055 */       partitionEnd = Long.MIN_VALUE;
   /* 056 */     } else {
   /* 057 */       partitionEnd = end.longValue();
   /* 058 */     }
   /* 059 */
   /* 060 */     java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
   /* 061 */       java.math.BigInteger.valueOf(range_nextIndex_0));
   /* 062 */     range_numElementsTodo_0  = startToEnd.divide(step).longValue();
   /* 063 */     if (range_numElementsTodo_0 < 0) {
   /* 064 */       range_numElementsTodo_0 = 0;
   /* 065 */     } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
   /* 066 */       range_numElementsTodo_0++;
   /* 067 */     }
   /* 068 */   }
   /* 069 */
   /* 070 */   protected void processNext() throws java.io.IOException {
   /* 071 */     // initialize Range
   /* 072 */     if (!range_initRange_0) {
   /* 073 */       range_initRange_0 = true;
   /* 074 */       initRange(partitionIndex);
   /* 075 */     }
   /* 076 */
   /* 077 */     while (true) {
   /* 078 */       if (range_nextIndex_0 == range_batchEnd_0) {
   /* 079 */         long range_nextBatchTodo_0;
   /* 080 */         if (range_numElementsTodo_0 > 1000L) {
   /* 081 */           range_nextBatchTodo_0 = 1000L;
   /* 082 */           range_numElementsTodo_0 -= 1000L;
   /* 083 */         } else {
   /* 084 */           range_nextBatchTodo_0 = range_numElementsTodo_0;
   /* 085 */           range_numElementsTodo_0 = 0;
   /* 086 */           if (range_nextBatchTodo_0 == 0) break;
   /* 087 */         }
   /* 088 */         range_batchEnd_0 += range_nextBatchTodo_0 * 1L;
   /* 089 */       }
   /* 090 */
   /* 091 */       int range_localEnd_0 = (int)((range_batchEnd_0 - range_nextIndex_0) / 1L);
   /* 092 */       for (int range_localIdx_0 = 0; range_localIdx_0 < range_localEnd_0; range_localIdx_0++) {
   /* 093 */         long range_value_0 = ((long)range_localIdx_0 * 1L) + range_nextIndex_0;
   /* 094 */
   /* 095 */         long project_value_0 = -1L;
   /* 096 */         project_value_0 = range_value_0 + 1L;
   /* 097 */         range_mutableStateArray_0[1].reset();
   /* 098 */
   /* 099 */         range_mutableStateArray_0[1].write(0, project_value_0);
   /* 100 */         append((range_mutableStateArray_0[1].getRow()));
   /* 101 */
   /* 102 */         if (shouldStop()) {
   /* 103 */           range_nextIndex_0 = range_value_0 + 1L;
   /* 104 */           ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localIdx_0 + 1);
   /* 105 */           range_inputMetrics_0.incRecordsRead(range_localIdx_0 + 1);
   /* 106 */           return;
   /* 107 */         }
   /* 108 */
   /* 109 */       }
   /* 110 */       range_nextIndex_0 = range_batchEnd_0;
   /* 111 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localEnd_0);
   /* 112 */       range_inputMetrics_0.incRecordsRead(range_localEnd_0);
   /* 113 */       range_taskContext_0.killTaskIfInterrupted();
   /* 114 */     }
   /* 115 */   }
   /* 116 */
   /* 117 */ }
   ```
   
   </details>
   
   ## How was this patch tested?
   
   Added unit tests.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org