You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wenchen Fan (JIRA)" <ji...@apache.org> on 2016/11/10 05:05:58 UTC

[jira] [Resolved] (SPARK-18147) Broken Spark SQL Codegen

     [ https://issues.apache.org/jira/browse/SPARK-18147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Wenchen Fan resolved SPARK-18147.
---------------------------------
       Resolution: Fixed
    Fix Version/s: 2.1.0

Issue resolved by pull request 15807
[https://github.com/apache/spark/pull/15807]

> Broken Spark SQL Codegen
> ------------------------
>
>                 Key: SPARK-18147
>                 URL: https://issues.apache.org/jira/browse/SPARK-18147
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.1
>            Reporter: koert kuipers
>            Priority: Critical
>             Fix For: 2.1.0
>
>
> this is me on purpose trying to break spark sql codegen to uncover potential issues, by creating arbitrately complex data structures using primitives, strings, basic collections (map, seq, option), tuples, and case classes.
> first example: nested case classes
> code:
> {noformat}
> class ComplexResultAgg[B: TypeTag, C: TypeTag](val zero: B, result: C) extends Aggregator[Row, B, C] {
>   override def reduce(b: B, input: Row): B = b
>   override def merge(b1: B, b2: B): B = b1
>   override def finish(reduction: B): C = result
>   override def bufferEncoder: Encoder[B] = ExpressionEncoder[B]()
>   override def outputEncoder: Encoder[C] = ExpressionEncoder[C]()
> }
> case class Struct2(d: Double = 0.0, s1: Seq[Double] = Seq.empty, s2: Seq[Long] = Seq.empty)
> case class Struct3(a: Struct2 = Struct2(), b: Struct2 = Struct2())
> val df1 = Seq(("a", "aa"), ("a", "aa"), ("b", "b"), ("b", null)).toDF("x", "y").groupBy("x").agg(
>   new ComplexResultAgg("boo", Struct3()).toColumn
> )
> df1.printSchema
> df1.show
> {noformat}
> the result is:
> {noformat}
> [info]   Cause: java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 33, Column 12: Expression "isNull1" is not an rvalue
> [info] /* 001 */ public java.lang.Object generate(Object[] references) {
> [info] /* 002 */   return new SpecificMutableProjection(references);
> [info] /* 003 */ }
> [info] /* 004 */
> [info] /* 005 */ class SpecificMutableProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
> [info] /* 006 */
> [info] /* 007 */   private Object[] references;
> [info] /* 008 */   private MutableRow mutableRow;
> [info] /* 009 */   private Object[] values;
> [info] /* 010 */   private java.lang.String errMsg;
> [info] /* 011 */   private Object[] values1;
> [info] /* 012 */   private java.lang.String errMsg1;
> [info] /* 013 */   private boolean[] argIsNulls;
> [info] /* 014 */   private scala.collection.Seq argValue;
> [info] /* 015 */   private java.lang.String errMsg2;
> [info] /* 016 */   private boolean[] argIsNulls1;
> [info] /* 017 */   private scala.collection.Seq argValue1;
> [info] /* 018 */   private java.lang.String errMsg3;
> [info] /* 019 */   private java.lang.String errMsg4;
> [info] /* 020 */   private Object[] values2;
> [info] /* 021 */   private java.lang.String errMsg5;
> [info] /* 022 */   private boolean[] argIsNulls2;
> [info] /* 023 */   private scala.collection.Seq argValue2;
> [info] /* 024 */   private java.lang.String errMsg6;
> [info] /* 025 */   private boolean[] argIsNulls3;
> [info] /* 026 */   private scala.collection.Seq argValue3;
> [info] /* 027 */   private java.lang.String errMsg7;
> [info] /* 028 */   private boolean isNull_0;
> [info] /* 029 */   private InternalRow value_0;
> [info] /* 030 */
> [info] /* 031 */   private void apply_1(InternalRow i) {
> [info] /* 032 */
> [info] /* 033 */     if (isNull1) {
> [info] /* 034 */       throw new RuntimeException(errMsg3);
> [info] /* 035 */     }
> [info] /* 036 */
> [info] /* 037 */     boolean isNull24 = false;
> [info] /* 038 */     final com.tresata.spark.sql.Struct2 value24 = isNull24 ? null : (com.tresata.spark.sql.Struct2) value1.a();
> [info] /* 039 */     isNull24 = value24 == null;
> [info] /* 040 */
> [info] /* 041 */     boolean isNull23 = isNull24;
> [info] /* 042 */     final scala.collection.Seq value23 = isNull23 ? null : (scala.collection.Seq) value24.s2();
> [info] /* 043 */     isNull23 = value23 == null;
> [info] /* 044 */     argIsNulls1[0] = isNull23;
> [info] /* 045 */     argValue1 = value23;
> [info] /* 046 */
> [info] /* 047 */
> [info] /* 048 */
> [info] /* 049 */     boolean isNull22 = false;
> [info] /* 050 */     for (int idx = 0; idx < 1; idx++) {
> [info] /* 051 */       if (argIsNulls1[idx]) { isNull22 = true; break; }
> [info] /* 052 */     }
> [info] /* 053 */
> [info] /* 054 */     final ArrayData value22 = isNull22 ? null : new org.apache.spark.sql.catalyst.util.GenericArrayData(argValue1);
> [info] /* 055 */     if (isNull22) {
> [info] /* 056 */       values1[2] = null;
> [info] /* 057 */     } else {
> [info] /* 058 */       values1[2] = value22;
> [info] /* 059 */     }
> [info] /* 060 */   }
> [info] /* 061 */
> [info] /* 062 */
> [info] /* 063 */   private void apply1_1(InternalRow i) {
> [info] /* 064 */
> [info] /* 065 */     if (isNull1) {
> [info] /* 066 */       throw new RuntimeException(errMsg7);
> [info] /* 067 */     }
> [info] /* 068 */
> [info] /* 069 */     boolean isNull41 = false;
> [info] /* 070 */     final com.tresata.spark.sql.Struct2 value41 = isNull41 ? null : (com.tresata.spark.sql.Struct2) value1.b();
> [info] /* 071 */     isNull41 = value41 == null;
> [info] /* 072 */
> [info] /* 073 */     boolean isNull40 = isNull41;
> [info] /* 074 */     final scala.collection.Seq value40 = isNull40 ? null : (scala.collection.Seq) value41.s2();
> [info] /* 075 */     isNull40 = value40 == null;
> [info] /* 076 */     argIsNulls3[0] = isNull40;
> [info] /* 077 */     argValue3 = value40;
> [info] /* 078 */
> [info] /* 079 */
> [info] /* 080 */
> [info] /* 081 */     boolean isNull39 = false;
> [info] /* 082 */     for (int idx = 0; idx < 1; idx++) {
> [info] /* 083 */       if (argIsNulls3[idx]) { isNull39 = true; break; }
> [info] /* 084 */     }
> [info] /* 085 */
> [info] /* 086 */     final ArrayData value39 = isNull39 ? null : new org.apache.spark.sql.catalyst.util.GenericArrayData(argValue3);
> [info] /* 087 */     if (isNull39) {
> [info] /* 088 */       values2[2] = null;
> [info] /* 089 */     } else {
> [info] /* 090 */       values2[2] = value39;
> [info] /* 091 */     }
> [info] /* 092 */   }
> [info] /* 093 */
> [info] /* 094 */
> [info] /* 095 */   private void apply_0(InternalRow i) {
> [info] /* 096 */
> [info] /* 097 */     if (isNull1) {
> [info] /* 098 */       throw new RuntimeException(errMsg1);
> [info] /* 099 */     }
> [info] /* 100 */
> [info] /* 101 */     boolean isNull16 = false;
> [info] /* 102 */     final com.tresata.spark.sql.Struct2 value16 = isNull16 ? null : (com.tresata.spark.sql.Struct2) value1.a();
> [info] /* 103 */     isNull16 = value16 == null;
> [info] /* 104 */
> [info] /* 105 */     boolean isNull15 = isNull16;
> [info] /* 106 */     final double value15 = isNull15 ? -1.0 : value16.d();
> [info] /* 107 */     if (isNull15) {
> [info] /* 108 */       values1[0] = null;
> [info] /* 109 */     } else {
> [info] /* 110 */       values1[0] = value15;
> [info] /* 111 */     }
> [info] /* 112 */     if (isNull1) {
> [info] /* 113 */       throw new RuntimeException(errMsg2);
> [info] /* 114 */     }
> [info] /* 115 */
> [info] /* 116 */     boolean isNull20 = false;
> [info] /* 117 */     final com.tresata.spark.sql.Struct2 value20 = isNull20 ? null : (com.tresata.spark.sql.Struct2) value1.a();
> [info] /* 118 */     isNull20 = value20 == null;
> [info] /* 119 */
> [info] /* 120 */     boolean isNull19 = isNull20;
> [info] /* 121 */     final scala.collection.Seq value19 = isNull19 ? null : (scala.collection.Seq) value20.s1();
> [info] /* 122 */     isNull19 = value19 == null;
> [info] /* 123 */     argIsNulls[0] = isNull19;
> [info] /* 124 */     argValue = value19;
> [info] /* 125 */
> [info] /* 126 */
> [info] /* 127 */
> [info] /* 128 */     boolean isNull18 = false;
> [info] /* 129 */     for (int idx = 0; idx < 1; idx++) {
> [info] /* 130 */       if (argIsNulls[idx]) { isNull18 = true; break; }
> [info] /* 131 */     }
> [info] /* 132 */
> [info] /* 133 */     final ArrayData value18 = isNull18 ? null : new org.apache.spark.sql.catalyst.util.GenericArrayData(argValue);
> [info] /* 134 */     if (isNull18) {
> [info] /* 135 */       values1[1] = null;
> [info] /* 136 */     } else {
> [info] /* 137 */       values1[1] = value18;
> [info] /* 138 */     }
> [info] /* 139 */   }
> [info] /* 140 */
> [info] /* 141 */
> [info] /* 142 */   private void apply1_0(InternalRow i) {
> [info] /* 143 */
> [info] /* 144 */     if (isNull1) {
> [info] /* 145 */       throw new RuntimeException(errMsg5);
> [info] /* 146 */     }
> [info] /* 147 */
> [info] /* 148 */     boolean isNull33 = false;
> [info] /* 149 */     final com.tresata.spark.sql.Struct2 value33 = isNull33 ? null : (com.tresata.spark.sql.Struct2) value1.b();
> [info] /* 150 */     isNull33 = value33 == null;
> [info] /* 151 */
> [info] /* 152 */     boolean isNull32 = isNull33;
> [info] /* 153 */     final double value32 = isNull32 ? -1.0 : value33.d();
> [info] /* 154 */     if (isNull32) {
> [info] /* 155 */       values2[0] = null;
> [info] /* 156 */     } else {
> [info] /* 157 */       values2[0] = value32;
> [info] /* 158 */     }
> [info] /* 159 */     if (isNull1) {
> [info] /* 160 */       throw new RuntimeException(errMsg6);
> [info] /* 161 */     }
> [info] /* 162 */
> [info] /* 163 */     boolean isNull37 = false;
> [info] /* 164 */     final com.tresata.spark.sql.Struct2 value37 = isNull37 ? null : (com.tresata.spark.sql.Struct2) value1.b();
> [info] /* 165 */     isNull37 = value37 == null;
> [info] /* 166 */
> [info] /* 167 */     boolean isNull36 = isNull37;
> [info] /* 168 */     final scala.collection.Seq value36 = isNull36 ? null : (scala.collection.Seq) value37.s1();
> [info] /* 169 */     isNull36 = value36 == null;
> [info] /* 170 */     argIsNulls2[0] = isNull36;
> [info] /* 171 */     argValue2 = value36;
> [info] /* 172 */
> [info] /* 173 */
> [info] /* 174 */
> [info] /* 175 */     boolean isNull35 = false;
> [info] /* 176 */     for (int idx = 0; idx < 1; idx++) {
> [info] /* 177 */       if (argIsNulls2[idx]) { isNull35 = true; break; }
> [info] /* 178 */     }
> [info] /* 179 */
> [info] /* 180 */     final ArrayData value35 = isNull35 ? null : new org.apache.spark.sql.catalyst.util.GenericArrayData(argValue2);
> [info] /* 181 */     if (isNull35) {
> [info] /* 182 */       values2[1] = null;
> [info] /* 183 */     } else {
> [info] /* 184 */       values2[1] = value35;
> [info] /* 185 */     }
> [info] /* 186 */   }
> [info] /* 187 */
> [info] /* 188 */
> [info] /* 189 */   public SpecificMutableProjection(Object[] references) {
> [info] /* 190 */     this.references = references;
> [info] /* 191 */     mutableRow = new org.apache.spark.sql.catalyst.expressions.GenericMutableRow(1);
> [info] /* 192 */     this.values = null;
> [info] /* 193 */     this.errMsg = (java.lang.String) references[1];
> [info] /* 194 */     this.values1 = null;
> [info] /* 195 */     this.errMsg1 = (java.lang.String) references[2];
> [info] /* 196 */     argIsNulls = new boolean[1];
> [info] /* 197 */
> [info] /* 198 */     this.errMsg2 = (java.lang.String) references[3];
> [info] /* 199 */     argIsNulls1 = new boolean[1];
> [info] /* 200 */
> [info] /* 201 */     this.errMsg3 = (java.lang.String) references[4];
> [info] /* 202 */     this.errMsg4 = (java.lang.String) references[5];
> [info] /* 203 */     this.values2 = null;
> [info] /* 204 */     this.errMsg5 = (java.lang.String) references[6];
> [info] /* 205 */     argIsNulls2 = new boolean[1];
> [info] /* 206 */
> [info] /* 207 */     this.errMsg6 = (java.lang.String) references[7];
> [info] /* 208 */     argIsNulls3 = new boolean[1];
> [info] /* 209 */
> [info] /* 210 */     this.errMsg7 = (java.lang.String) references[8];
> [info] /* 211 */     this.isNull_0 = true;
> [info] /* 212 */     this.value_0 = null;
> [info] /* 213 */   }
> [info] /* 214 */
> [info] /* 215 */   public org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection target(MutableRow row) {
> [info] /* 216 */     mutableRow = row;
> [info] /* 217 */     return this;
> [info] /* 218 */   }
> [info] /* 219 */
> [info] /* 220 */   /* Provide immutable access to the last projected row. */
> [info] /* 221 */   public InternalRow currentValue() {
> [info] /* 222 */     return (InternalRow) mutableRow;
> [info] /* 223 */   }
> [info] /* 224 */
> [info] /* 225 */   public java.lang.Object apply(java.lang.Object _i) {
> [info] /* 226 */     InternalRow i = (InternalRow) _i;
> [info] /* 227 */
> [info] /* 228 */
> [info] /* 229 */
> [info] /* 230 */     Object obj = ((Expression) references[0]).eval(null);
> [info] /* 231 */     org.apache.spark.sql.expressions.Aggregator value2 = (org.apache.spark.sql.expressions.Aggregator) obj;
> [info] /* 232 */
> [info] /* 233 */     boolean isNull4 = i.isNullAt(0);
> [info] /* 234 */     UTF8String value4 = isNull4 ? null : (i.getUTF8String(0));
> [info] /* 235 */
> [info] /* 236 */     boolean isNull3 = isNull4;
> [info] /* 237 */     final java.lang.String value3 = isNull3 ? null : (java.lang.String) value4.toString();
> [info] /* 238 */     isNull3 = value3 == null;
> [info] /* 239 */     boolean isNull1 = false || isNull3;
> [info] /* 240 */     final com.tresata.spark.sql.Struct3 value1 = isNull1 ? null : (com.tresata.spark.sql.Struct3) value2.finish(value3);
> [info] /* 241 */     isNull1 = value1 == null;
> [info] /* 242 */
> [info] /* 243 */     boolean isNull5 = false;
> [info] /* 244 */     InternalRow value5 = null;
> [info] /* 245 */     if (!false && isNull1) {
> [info] /* 246 */
> [info] /* 247 */       final InternalRow value7 = null;
> [info] /* 248 */       isNull5 = true;
> [info] /* 249 */       value5 = value7;
> [info] /* 250 */     } else {
> [info] /* 251 */
> [info] /* 252 */       boolean isNull8 = false;
> [info] /* 253 */       this.values = new Object[2];
> [info] /* 254 */       if (isNull1) {
> [info] /* 255 */         throw new RuntimeException(errMsg);
> [info] /* 256 */       }
> [info] /* 257 */
> [info] /* 258 */       boolean isNull11 = false;
> [info] /* 259 */       final com.tresata.spark.sql.Struct2 value11 = isNull11 ? null : (com.tresata.spark.sql.Struct2) value1.a();
> [info] /* 260 */       isNull11 = value11 == null;
> [info] /* 261 */       boolean isNull9 = false;
> [info] /* 262 */       InternalRow value9 = null;
> [info] /* 263 */       if (!false && isNull11) {
> [info] /* 264 */
> [info] /* 265 */         final InternalRow value13 = null;
> [info] /* 266 */         isNull9 = true;
> [info] /* 267 */         value9 = value13;
> [info] /* 268 */       } else {
> [info] /* 269 */
> [info] /* 270 */         boolean isNull14 = false;
> [info] /* 271 */         values1 = new Object[3];apply_0(i);
> [info] /* 272 */         apply_1(i);
> [info] /* 273 */         final InternalRow value14 = new org.apache.spark.sql.catalyst.expressions.GenericInternalRow(values1);
> [info] /* 274 */         this.values1 = null;
> [info] /* 275 */         isNull9 = isNull14;
> [info] /* 276 */         value9 = value14;
> [info] /* 277 */       }
> [info] /* 278 */       if (isNull9) {
> [info] /* 279 */         values[0] = null;
> [info] /* 280 */       } else {
> [info] /* 281 */         values[0] = value9;
> [info] /* 282 */       }
> [info] /* 283 */       if (isNull1) {
> [info] /* 284 */         throw new RuntimeException(errMsg4);
> [info] /* 285 */       }
> [info] /* 286 */
> [info] /* 287 */       boolean isNull28 = false;
> [info] /* 288 */       final com.tresata.spark.sql.Struct2 value28 = isNull28 ? null : (com.tresata.spark.sql.Struct2) value1.b();
> [info] /* 289 */       isNull28 = value28 == null;
> [info] /* 290 */       boolean isNull26 = false;
> [info] /* 291 */       InternalRow value26 = null;
> [info] /* 292 */       if (!false && isNull28) {
> [info] /* 293 */
> [info] /* 294 */         final InternalRow value30 = null;
> [info] /* 295 */         isNull26 = true;
> [info] /* 296 */         value26 = value30;
> [info] /* 297 */       } else {
> [info] /* 298 */
> [info] /* 299 */         boolean isNull31 = false;
> [info] /* 300 */         values2 = new Object[3];apply1_0(i);
> [info] /* 301 */         apply1_1(i);
> [info] /* 302 */         final InternalRow value31 = new org.apache.spark.sql.catalyst.expressions.GenericInternalRow(values2);
> [info] /* 303 */         this.values2 = null;
> [info] /* 304 */         isNull26 = isNull31;
> [info] /* 305 */         value26 = value31;
> [info] /* 306 */       }
> [info] /* 307 */       if (isNull26) {
> [info] /* 308 */         values[1] = null;
> [info] /* 309 */       } else {
> [info] /* 310 */         values[1] = value26;
> [info] /* 311 */       }
> [info] /* 312 */       final InternalRow value8 = new org.apache.spark.sql.catalyst.expressions.GenericInternalRow(values);
> [info] /* 313 */       this.values = null;
> [info] /* 314 */       isNull5 = isNull8;
> [info] /* 315 */       value5 = value8;
> [info] /* 316 */     }
> [info] /* 317 */     this.isNull_0 = isNull5;
> [info] /* 318 */     this.value_0 = value5;
> [info] /* 319 */
> [info] /* 320 */     // copy all the results into MutableRow
> [info] /* 321 */
> [info] /* 322 */     if (!this.isNull_0) {
> [info] /* 323 */       mutableRow.update(0, this.value_0);
> [info] /* 324 */     } else {
> [info] /* 325 */       mutableRow.setNullAt(0);
> [info] /* 326 */     }
> [info] /* 327 */
> [info] /* 328 */     return mutableRow;
> [info] /* 329 */   }
> [info] /* 330 */ }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org