You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2018/03/13 22:04:20 UTC
spark git commit: [SPARK-23598][SQL] Make methods in
BufferedRowIterator public to avoid runtime error for a large query
Repository: spark
Updated Branches:
refs/heads/master 918fb9bee -> 1098933b0
[SPARK-23598][SQL] Make methods in BufferedRowIterator public to avoid runtime error for a large query
## What changes were proposed in this pull request?
This PR fixes runtime error regarding a large query when a generated code has split classes. The issue is `append()`, `stopEarly()`, and other methods are not accessible from split classes that are not subclasses of `BufferedRowIterator`.
This PR fixes this issue by making them `public`.
Before applying the PR, we see the following exception by running the attached program with `CodeGenerator.GENERATED_CLASS_SIZE_THRESHOLD=-1`.
```
test("SPARK-23598") {
// When set -1 to CodeGenerator.GENERATED_CLASS_SIZE_THRESHOLD, an exception is thrown
val df_pet_age = Seq((8, "bat"), (15, "mouse"), (5, "horse")).toDF("age", "name")
df_pet_age.groupBy("name").avg("age").show()
}
```
Exception:
```
19:40:52.591 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19:41:32.319 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.IllegalAccessError: tried to access method org.apache.spark.sql.execution.BufferedRowIterator.shouldStop()Z from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1.agg_doAggregateWithKeys$(generated.java:203)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:160)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:616)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
...
```
Generated code (line 195 calles `stopEarly()`).
```
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private boolean agg_initAgg;
/* 010 */ private boolean agg_bufIsNull;
/* 011 */ private double agg_bufValue;
/* 012 */ private boolean agg_bufIsNull1;
/* 013 */ private long agg_bufValue1;
/* 014 */ private agg_FastHashMap agg_fastHashMap;
/* 015 */ private org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> agg_fastHashMapIter;
/* 016 */ private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 017 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 018 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter;
/* 019 */ private scala.collection.Iterator inputadapter_input;
/* 020 */ private boolean agg_agg_isNull11;
/* 021 */ private boolean agg_agg_isNull25;
/* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] agg_mutableStateArray1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[2];
/* 023 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] agg_mutableStateArray2 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
/* 024 */ private UnsafeRow[] agg_mutableStateArray = new UnsafeRow[2];
/* 025 */
/* 026 */ public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 027 */ this.references = references;
/* 028 */ }
/* 029 */
/* 030 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 031 */ partitionIndex = index;
/* 032 */ this.inputs = inputs;
/* 033 */
/* 034 */ agg_fastHashMap = new agg_FastHashMap(((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).getTaskMemoryManager(), ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).getEmptyAggregationBuffer());
/* 035 */ agg_hashMap = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).createHashMap();
/* 036 */ inputadapter_input = inputs[0];
/* 037 */ agg_mutableStateArray[0] = new UnsafeRow(1);
/* 038 */ agg_mutableStateArray1[0] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray[0], 32);
/* 039 */ agg_mutableStateArray2[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray1[0], 1);
/* 040 */ agg_mutableStateArray[1] = new UnsafeRow(3);
/* 041 */ agg_mutableStateArray1[1] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray[1], 32);
/* 042 */ agg_mutableStateArray2[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray1[1], 3);
/* 043 */
/* 044 */ }
/* 045 */
/* 046 */ public class agg_FastHashMap {
/* 047 */ private org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch;
/* 048 */ private int[] buckets;
/* 049 */ private int capacity = 1 << 16;
/* 050 */ private double loadFactor = 0.5;
/* 051 */ private int numBuckets = (int) (capacity / loadFactor);
/* 052 */ private int maxSteps = 2;
/* 053 */ private int numRows = 0;
/* 054 */ private org.apache.spark.sql.types.StructType keySchema = new org.apache.spark.sql.types.StructType().add(((java.lang.String) references[1] /* keyName */), org.apache.spark.sql.types.DataTypes.StringType);
/* 055 */ private org.apache.spark.sql.types.StructType valueSchema = new org.apache.spark.sql.types.StructType().add(((java.lang.String) references[2] /* keyName */), org.apache.spark.sql.types.DataTypes.DoubleType)
/* 056 */ .add(((java.lang.String) references[3] /* keyName */), org.apache.spark.sql.types.DataTypes.LongType);
/* 057 */ private Object emptyVBase;
/* 058 */ private long emptyVOff;
/* 059 */ private int emptyVLen;
/* 060 */ private boolean isBatchFull = false;
/* 061 */
/* 062 */ public agg_FastHashMap(
/* 063 */ org.apache.spark.memory.TaskMemoryManager taskMemoryManager,
/* 064 */ InternalRow emptyAggregationBuffer) {
/* 065 */ batch = org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch
/* 066 */ .allocate(keySchema, valueSchema, taskMemoryManager, capacity);
/* 067 */
/* 068 */ final UnsafeProjection valueProjection = UnsafeProjection.create(valueSchema);
/* 069 */ final byte[] emptyBuffer = valueProjection.apply(emptyAggregationBuffer).getBytes();
/* 070 */
/* 071 */ emptyVBase = emptyBuffer;
/* 072 */ emptyVOff = Platform.BYTE_ARRAY_OFFSET;
/* 073 */ emptyVLen = emptyBuffer.length;
/* 074 */
/* 075 */ buckets = new int[numBuckets];
/* 076 */ java.util.Arrays.fill(buckets, -1);
/* 077 */ }
/* 078 */
/* 079 */ public org.apache.spark.sql.catalyst.expressions.UnsafeRow findOrInsert(UTF8String agg_key) {
/* 080 */ long h = hash(agg_key);
/* 081 */ int step = 0;
/* 082 */ int idx = (int) h & (numBuckets - 1);
/* 083 */ while (step < maxSteps) {
/* 084 */ // Return bucket index if it's either an empty slot or already contains the key
/* 085 */ if (buckets[idx] == -1) {
/* 086 */ if (numRows < capacity && !isBatchFull) {
/* 087 */ // creating the unsafe for new entry
/* 088 */ UnsafeRow agg_result = new UnsafeRow(1);
/* 089 */ org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder
/* 090 */ = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result,
/* 091 */ 32);
/* 092 */ org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter
/* 093 */ = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(
/* 094 */ agg_holder,
/* 095 */ 1);
/* 096 */ agg_holder.reset(); //TODO: investigate if reset or zeroout are actually needed
/* 097 */ agg_rowWriter.zeroOutNullBytes();
/* 098 */ agg_rowWriter.write(0, agg_key);
/* 099 */ agg_result.setTotalSize(agg_holder.totalSize());
/* 100 */ Object kbase = agg_result.getBaseObject();
/* 101 */ long koff = agg_result.getBaseOffset();
/* 102 */ int klen = agg_result.getSizeInBytes();
/* 103 */
/* 104 */ UnsafeRow vRow
/* 105 */ = batch.appendRow(kbase, koff, klen, emptyVBase, emptyVOff, emptyVLen);
/* 106 */ if (vRow == null) {
/* 107 */ isBatchFull = true;
/* 108 */ } else {
/* 109 */ buckets[idx] = numRows++;
/* 110 */ }
/* 111 */ return vRow;
/* 112 */ } else {
/* 113 */ // No more space
/* 114 */ return null;
/* 115 */ }
/* 116 */ } else if (equals(idx, agg_key)) {
/* 117 */ return batch.getValueRow(buckets[idx]);
/* 118 */ }
/* 119 */ idx = (idx + 1) & (numBuckets - 1);
/* 120 */ step++;
/* 121 */ }
/* 122 */ // Didn't find it
/* 123 */ return null;
/* 124 */ }
/* 125 */
/* 126 */ private boolean equals(int idx, UTF8String agg_key) {
/* 127 */ UnsafeRow row = batch.getKeyRow(buckets[idx]);
/* 128 */ return (row.getUTF8String(0).equals(agg_key));
/* 129 */ }
/* 130 */
/* 131 */ private long hash(UTF8String agg_key) {
/* 132 */ long agg_hash = 0;
/* 133 */
/* 134 */ int agg_result = 0;
/* 135 */ byte[] agg_bytes = agg_key.getBytes();
/* 136 */ for (int i = 0; i < agg_bytes.length; i++) {
/* 137 */ int agg_hash1 = agg_bytes[i];
/* 138 */ agg_result = (agg_result ^ (0x9e3779b9)) + agg_hash1 + (agg_result << 6) + (agg_result >>> 2);
/* 139 */ }
/* 140 */
/* 141 */ agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result + (agg_hash << 6) + (agg_hash >>> 2);
/* 142 */
/* 143 */ return agg_hash;
/* 144 */ }
/* 145 */
/* 146 */ public org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> rowIterator() {
/* 147 */ return batch.rowIterator();
/* 148 */ }
/* 149 */
/* 150 */ public void close() {
/* 151 */ batch.close();
/* 152 */ }
/* 153 */
/* 154 */ }
/* 155 */
/* 156 */ protected void processNext() throws java.io.IOException {
/* 157 */ if (!agg_initAgg) {
/* 158 */ agg_initAgg = true;
/* 159 */ long wholestagecodegen_beforeAgg = System.nanoTime();
/* 160 */ agg_nestedClassInstance1.agg_doAggregateWithKeys();
/* 161 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[8] /* aggTime */).add((System.nanoTime() - wholestagecodegen_beforeAgg) / 1000000);
/* 162 */ }
/* 163 */
/* 164 */ // output the result
/* 165 */
/* 166 */ while (agg_fastHashMapIter.next()) {
/* 167 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_fastHashMapIter.getKey();
/* 168 */ UnsafeRow agg_aggBuffer = (UnsafeRow) agg_fastHashMapIter.getValue();
/* 169 */ wholestagecodegen_nestedClassInstance.agg_doAggregateWithKeysOutput(agg_aggKey, agg_aggBuffer);
/* 170 */
/* 171 */ if (shouldStop()) return;
/* 172 */ }
/* 173 */ agg_fastHashMap.close();
/* 174 */
/* 175 */ while (agg_mapIter.next()) {
/* 176 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 177 */ UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();
/* 178 */ wholestagecodegen_nestedClassInstance.agg_doAggregateWithKeysOutput(agg_aggKey, agg_aggBuffer);
/* 179 */
/* 180 */ if (shouldStop()) return;
/* 181 */ }
/* 182 */
/* 183 */ agg_mapIter.close();
/* 184 */ if (agg_sorter == null) {
/* 185 */ agg_hashMap.free();
/* 186 */ }
/* 187 */ }
/* 188 */
/* 189 */ private wholestagecodegen_NestedClass wholestagecodegen_nestedClassInstance = new wholestagecodegen_NestedClass();
/* 190 */ private agg_NestedClass1 agg_nestedClassInstance1 = new agg_NestedClass1();
/* 191 */ private agg_NestedClass agg_nestedClassInstance = new agg_NestedClass();
/* 192 */
/* 193 */ private class agg_NestedClass1 {
/* 194 */ private void agg_doAggregateWithKeys() throws java.io.IOException {
/* 195 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 196 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 197 */ int inputadapter_value = inputadapter_row.getInt(0);
/* 198 */ boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 199 */ UTF8String inputadapter_value1 = inputadapter_isNull1 ?
/* 200 */ null : (inputadapter_row.getUTF8String(1));
/* 201 */
/* 202 */ agg_nestedClassInstance.agg_doConsume(inputadapter_row, inputadapter_value, inputadapter_value1, inputadapter_isNull1);
/* 203 */ if (shouldStop()) return;
/* 204 */ }
/* 205 */
/* 206 */ agg_fastHashMapIter = agg_fastHashMap.rowIterator();
/* 207 */ agg_mapIter = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).finishAggregate(agg_hashMap, agg_sorter, ((org.apache.spark.sql.execution.metric.SQLMetric) references[4] /* peakMemory */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[5] /* spillSize */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[6] /* avgHashProbe */));
/* 208 */
/* 209 */ }
/* 210 */
/* 211 */ }
/* 212 */
/* 213 */ private class wholestagecodegen_NestedClass {
/* 214 */ private void agg_doAggregateWithKeysOutput(UnsafeRow agg_keyTerm, UnsafeRow agg_bufferTerm)
/* 215 */ throws java.io.IOException {
/* 216 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[7] /* numOutputRows */).add(1);
/* 217 */
/* 218 */ boolean agg_isNull35 = agg_keyTerm.isNullAt(0);
/* 219 */ UTF8String agg_value37 = agg_isNull35 ?
/* 220 */ null : (agg_keyTerm.getUTF8String(0));
/* 221 */ boolean agg_isNull36 = agg_bufferTerm.isNullAt(0);
/* 222 */ double agg_value38 = agg_isNull36 ?
/* 223 */ -1.0 : (agg_bufferTerm.getDouble(0));
/* 224 */ boolean agg_isNull37 = agg_bufferTerm.isNullAt(1);
/* 225 */ long agg_value39 = agg_isNull37 ?
/* 226 */ -1L : (agg_bufferTerm.getLong(1));
/* 227 */
/* 228 */ agg_mutableStateArray1[1].reset();
/* 229 */
/* 230 */ agg_mutableStateArray2[1].zeroOutNullBytes();
/* 231 */
/* 232 */ if (agg_isNull35) {
/* 233 */ agg_mutableStateArray2[1].setNullAt(0);
/* 234 */ } else {
/* 235 */ agg_mutableStateArray2[1].write(0, agg_value37);
/* 236 */ }
/* 237 */
/* 238 */ if (agg_isNull36) {
/* 239 */ agg_mutableStateArray2[1].setNullAt(1);
/* 240 */ } else {
/* 241 */ agg_mutableStateArray2[1].write(1, agg_value38);
/* 242 */ }
/* 243 */
/* 244 */ if (agg_isNull37) {
/* 245 */ agg_mutableStateArray2[1].setNullAt(2);
/* 246 */ } else {
/* 247 */ agg_mutableStateArray2[1].write(2, agg_value39);
/* 248 */ }
/* 249 */ agg_mutableStateArray[1].setTotalSize(agg_mutableStateArray1[1].totalSize());
/* 250 */ append(agg_mutableStateArray[1]);
/* 251 */
/* 252 */ }
/* 253 */
/* 254 */ }
/* 255 */
/* 256 */ private class agg_NestedClass {
/* 257 */ private void agg_doConsume(InternalRow inputadapter_row, int agg_expr_0, UTF8String agg_expr_1, boolean agg_exprIsNull_1) throws java.io.IOException {
/* 258 */ UnsafeRow agg_unsafeRowAggBuffer = null;
/* 259 */ UnsafeRow agg_fastAggBuffer = null;
/* 260 */
/* 261 */ if (true) {
/* 262 */ if (!agg_exprIsNull_1) {
/* 263 */ agg_fastAggBuffer = agg_fastHashMap.findOrInsert(
/* 264 */ agg_expr_1);
/* 265 */ }
/* 266 */ }
/* 267 */ // Cannot find the key in fast hash map, try regular hash map.
/* 268 */ if (agg_fastAggBuffer == null) {
/* 269 */ // generate grouping key
/* 270 */ agg_mutableStateArray1[0].reset();
/* 271 */
/* 272 */ agg_mutableStateArray2[0].zeroOutNullBytes();
/* 273 */
/* 274 */ if (agg_exprIsNull_1) {
/* 275 */ agg_mutableStateArray2[0].setNullAt(0);
/* 276 */ } else {
/* 277 */ agg_mutableStateArray2[0].write(0, agg_expr_1);
/* 278 */ }
/* 279 */ agg_mutableStateArray[0].setTotalSize(agg_mutableStateArray1[0].totalSize());
/* 280 */ int agg_value7 = 42;
/* 281 */
/* 282 */ if (!agg_exprIsNull_1) {
/* 283 */ agg_value7 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(agg_expr_1.getBaseObject(), agg_expr_1.getBaseOffset(), agg_expr_1.numBytes(), agg_value7);
/* 284 */ }
/* 285 */ if (true) {
/* 286 */ // try to get the buffer from hash map
/* 287 */ agg_unsafeRowAggBuffer =
/* 288 */ agg_hashMap.getAggregationBufferFromUnsafeRow(agg_mutableStateArray[0], agg_value7);
/* 289 */ }
/* 290 */ // Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
/* 291 */ // aggregation after processing all input rows.
/* 292 */ if (agg_unsafeRowAggBuffer == null) {
/* 293 */ if (agg_sorter == null) {
/* 294 */ agg_sorter = agg_hashMap.destructAndCreateExternalSorter();
/* 295 */ } else {
/* 296 */ agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 297 */ }
/* 298 */
/* 299 */ // the hash map had be spilled, it should have enough memory now,
/* 300 */ // try to allocate buffer again.
/* 301 */ agg_unsafeRowAggBuffer = agg_hashMap.getAggregationBufferFromUnsafeRow(
/* 302 */ agg_mutableStateArray[0], agg_value7);
/* 303 */ if (agg_unsafeRowAggBuffer == null) {
/* 304 */ // failed to allocate the first page
/* 305 */ throw new OutOfMemoryError("No enough memory for aggregation");
/* 306 */ }
/* 307 */ }
/* 308 */
/* 309 */ }
/* 310 */
/* 311 */ if (agg_fastAggBuffer != null) {
/* 312 */ // common sub-expressions
/* 313 */ boolean agg_isNull21 = false;
/* 314 */ long agg_value23 = -1L;
/* 315 */ if (!false) {
/* 316 */ agg_value23 = (long) agg_expr_0;
/* 317 */ }
/* 318 */ // evaluate aggregate function
/* 319 */ boolean agg_isNull23 = true;
/* 320 */ double agg_value25 = -1.0;
/* 321 */
/* 322 */ boolean agg_isNull24 = agg_fastAggBuffer.isNullAt(0);
/* 323 */ double agg_value26 = agg_isNull24 ?
/* 324 */ -1.0 : (agg_fastAggBuffer.getDouble(0));
/* 325 */ if (!agg_isNull24) {
/* 326 */ agg_agg_isNull25 = true;
/* 327 */ double agg_value27 = -1.0;
/* 328 */ do {
/* 329 */ boolean agg_isNull26 = agg_isNull21;
/* 330 */ double agg_value28 = -1.0;
/* 331 */ if (!agg_isNull21) {
/* 332 */ agg_value28 = (double) agg_value23;
/* 333 */ }
/* 334 */ if (!agg_isNull26) {
/* 335 */ agg_agg_isNull25 = false;
/* 336 */ agg_value27 = agg_value28;
/* 337 */ continue;
/* 338 */ }
/* 339 */
/* 340 */ boolean agg_isNull27 = false;
/* 341 */ double agg_value29 = -1.0;
/* 342 */ if (!false) {
/* 343 */ agg_value29 = (double) 0;
/* 344 */ }
/* 345 */ if (!agg_isNull27) {
/* 346 */ agg_agg_isNull25 = false;
/* 347 */ agg_value27 = agg_value29;
/* 348 */ continue;
/* 349 */ }
/* 350 */
/* 351 */ } while (false);
/* 352 */
/* 353 */ agg_isNull23 = false; // resultCode could change nullability.
/* 354 */ agg_value25 = agg_value26 + agg_value27;
/* 355 */
/* 356 */ }
/* 357 */ boolean agg_isNull29 = false;
/* 358 */ long agg_value31 = -1L;
/* 359 */ if (!false && agg_isNull21) {
/* 360 */ boolean agg_isNull31 = agg_fastAggBuffer.isNullAt(1);
/* 361 */ long agg_value33 = agg_isNull31 ?
/* 362 */ -1L : (agg_fastAggBuffer.getLong(1));
/* 363 */ agg_isNull29 = agg_isNull31;
/* 364 */ agg_value31 = agg_value33;
/* 365 */ } else {
/* 366 */ boolean agg_isNull32 = true;
/* 367 */ long agg_value34 = -1L;
/* 368 */
/* 369 */ boolean agg_isNull33 = agg_fastAggBuffer.isNullAt(1);
/* 370 */ long agg_value35 = agg_isNull33 ?
/* 371 */ -1L : (agg_fastAggBuffer.getLong(1));
/* 372 */ if (!agg_isNull33) {
/* 373 */ agg_isNull32 = false; // resultCode could change nullability.
/* 374 */ agg_value34 = agg_value35 + 1L;
/* 375 */
/* 376 */ }
/* 377 */ agg_isNull29 = agg_isNull32;
/* 378 */ agg_value31 = agg_value34;
/* 379 */ }
/* 380 */ // update fast row
/* 381 */ if (!agg_isNull23) {
/* 382 */ agg_fastAggBuffer.setDouble(0, agg_value25);
/* 383 */ } else {
/* 384 */ agg_fastAggBuffer.setNullAt(0);
/* 385 */ }
/* 386 */
/* 387 */ if (!agg_isNull29) {
/* 388 */ agg_fastAggBuffer.setLong(1, agg_value31);
/* 389 */ } else {
/* 390 */ agg_fastAggBuffer.setNullAt(1);
/* 391 */ }
/* 392 */ } else {
/* 393 */ // common sub-expressions
/* 394 */ boolean agg_isNull7 = false;
/* 395 */ long agg_value9 = -1L;
/* 396 */ if (!false) {
/* 397 */ agg_value9 = (long) agg_expr_0;
/* 398 */ }
/* 399 */ // evaluate aggregate function
/* 400 */ boolean agg_isNull9 = true;
/* 401 */ double agg_value11 = -1.0;
/* 402 */
/* 403 */ boolean agg_isNull10 = agg_unsafeRowAggBuffer.isNullAt(0);
/* 404 */ double agg_value12 = agg_isNull10 ?
/* 405 */ -1.0 : (agg_unsafeRowAggBuffer.getDouble(0));
/* 406 */ if (!agg_isNull10) {
/* 407 */ agg_agg_isNull11 = true;
/* 408 */ double agg_value13 = -1.0;
/* 409 */ do {
/* 410 */ boolean agg_isNull12 = agg_isNull7;
/* 411 */ double agg_value14 = -1.0;
/* 412 */ if (!agg_isNull7) {
/* 413 */ agg_value14 = (double) agg_value9;
/* 414 */ }
/* 415 */ if (!agg_isNull12) {
/* 416 */ agg_agg_isNull11 = false;
/* 417 */ agg_value13 = agg_value14;
/* 418 */ continue;
/* 419 */ }
/* 420 */
/* 421 */ boolean agg_isNull13 = false;
/* 422 */ double agg_value15 = -1.0;
/* 423 */ if (!false) {
/* 424 */ agg_value15 = (double) 0;
/* 425 */ }
/* 426 */ if (!agg_isNull13) {
/* 427 */ agg_agg_isNull11 = false;
/* 428 */ agg_value13 = agg_value15;
/* 429 */ continue;
/* 430 */ }
/* 431 */
/* 432 */ } while (false);
/* 433 */
/* 434 */ agg_isNull9 = false; // resultCode could change nullability.
/* 435 */ agg_value11 = agg_value12 + agg_value13;
/* 436 */
/* 437 */ }
/* 438 */ boolean agg_isNull15 = false;
/* 439 */ long agg_value17 = -1L;
/* 440 */ if (!false && agg_isNull7) {
/* 441 */ boolean agg_isNull17 = agg_unsafeRowAggBuffer.isNullAt(1);
/* 442 */ long agg_value19 = agg_isNull17 ?
/* 443 */ -1L : (agg_unsafeRowAggBuffer.getLong(1));
/* 444 */ agg_isNull15 = agg_isNull17;
/* 445 */ agg_value17 = agg_value19;
/* 446 */ } else {
/* 447 */ boolean agg_isNull18 = true;
/* 448 */ long agg_value20 = -1L;
/* 449 */
/* 450 */ boolean agg_isNull19 = agg_unsafeRowAggBuffer.isNullAt(1);
/* 451 */ long agg_value21 = agg_isNull19 ?
/* 452 */ -1L : (agg_unsafeRowAggBuffer.getLong(1));
/* 453 */ if (!agg_isNull19) {
/* 454 */ agg_isNull18 = false; // resultCode could change nullability.
/* 455 */ agg_value20 = agg_value21 + 1L;
/* 456 */
/* 457 */ }
/* 458 */ agg_isNull15 = agg_isNull18;
/* 459 */ agg_value17 = agg_value20;
/* 460 */ }
/* 461 */ // update unsafe row buffer
/* 462 */ if (!agg_isNull9) {
/* 463 */ agg_unsafeRowAggBuffer.setDouble(0, agg_value11);
/* 464 */ } else {
/* 465 */ agg_unsafeRowAggBuffer.setNullAt(0);
/* 466 */ }
/* 467 */
/* 468 */ if (!agg_isNull15) {
/* 469 */ agg_unsafeRowAggBuffer.setLong(1, agg_value17);
/* 470 */ } else {
/* 471 */ agg_unsafeRowAggBuffer.setNullAt(1);
/* 472 */ }
/* 473 */
/* 474 */ }
/* 475 */
/* 476 */ }
/* 477 */
/* 478 */ }
/* 479 */
/* 480 */ }
```
## How was this patch tested?
Added UT into `WholeStageCodegenSuite`
Author: Kazuaki Ishizaki <is...@jp.ibm.com>
Closes #20779 from kiszk/SPARK-23598.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1098933b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1098933b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1098933b
Branch: refs/heads/master
Commit: 1098933b0ac5cdb18101d3aebefa773c2ce05a50
Parents: 918fb9b
Author: Kazuaki Ishizaki <is...@jp.ibm.com>
Authored: Tue Mar 13 23:04:16 2018 +0100
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Tue Mar 13 23:04:16 2018 +0100
----------------------------------------------------------------------
.../apache/spark/sql/execution/BufferedRowIterator.java | 12 ++++++++----
.../spark/sql/execution/WholeStageCodegenSuite.scala | 12 ++++++++++++
2 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1098933b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java
index 730a4ae..74c9c05 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java
@@ -62,10 +62,14 @@ public abstract class BufferedRowIterator {
*/
public abstract void init(int index, Iterator<InternalRow>[] iters);
+ /*
+ * Attributes of the following four methods are public. Thus, they can be also accessed from
+ * methods in inner classes. See SPARK-23598
+ */
/**
* Append a row to currentRows.
*/
- protected void append(InternalRow row) {
+ public void append(InternalRow row) {
currentRows.add(row);
}
@@ -75,7 +79,7 @@ public abstract class BufferedRowIterator {
* If it returns true, the caller should exit the loop that [[InputAdapter]] generates.
* This interface is mainly used to limit the number of input rows.
*/
- protected boolean stopEarly() {
+ public boolean stopEarly() {
return false;
}
@@ -84,14 +88,14 @@ public abstract class BufferedRowIterator {
*
* If it returns true, the caller should exit the loop (return from processNext()).
*/
- protected boolean shouldStop() {
+ public boolean shouldStop() {
return !currentRows.isEmpty();
}
/**
* Increase the peak execution memory for current task.
*/
- protected void incPeakExecutionMemory(long size) {
+ public void incPeakExecutionMemory(long size) {
TaskContext.get().taskMetrics().incPeakExecutionMemory(size);
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1098933b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 0fb9dd2..4b40e4e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -32,6 +32,8 @@ import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
+ import testImplicits._
+
test("range/filter should be combined") {
val df = spark.range(10).filter("id = 1").selectExpr("id + 1")
val plan = df.queryExecution.executedPlan
@@ -307,4 +309,14 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
// a different query can result in codegen cache miss, that's by design
}
}
+
+ test("SPARK-23598: Codegen working for lots of aggregation operations without runtime errors") {
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
+ var df = Seq((8, "bat"), (15, "mouse"), (5, "horse")).toDF("age", "name")
+ for (i <- 0 until 70) {
+ df = df.groupBy("name").agg(avg("age").alias("age"))
+ }
+ assert(df.limit(1).collect() === Array(Row("bat", 8.0)))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org