You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jonny Serencsa (JIRA)" <ji...@apache.org> on 2016/11/10 01:20:59 UTC

[jira] [Created] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses

Jonny Serencsa created SPARK-18394:
--------------------------------------

             Summary: Executing the same query twice in a row results in CodeGenerator cache misses
                 Key: SPARK-18394
                 URL: https://issues.apache.org/jira/browse/SPARK-18394
             Project: Spark
          Issue Type: Bug
          Components: SQL
         Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop
            Reporter: Jonny Serencsa


Executing the query:
{noformat}
select
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    lineitem_1_row
where
    l_shipdate <= date_sub('1998-12-01', '90')
group by
    l_returnflag,
    l_linestatus
;
{noformat}
twice (in succession), will result in CodeGenerator cache misses in BOTH executions. Since the query is identical, I would expect the same code to be generated. 

Turns out, the generated code is not exactly the same, resulting in cache misses when performing the lookup in the CodeGenerator cache. Yet, the code is equivalent. 

Below is (some portion of the) generated code for two runs of the query:

run-1
{noformat}
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import scala.collection.Iterator;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;

public SpecificColumnarIterator generate(Object[] references) {
return new SpecificColumnarIterator();
}

class SpecificColumnarIterator extends org.apache.spark.sql.execution.columnar.ColumnarIterator {

private ByteOrder nativeOrder = null;
private byte[][] buffers = null;
private UnsafeRow unsafeRow = new UnsafeRow(7);
private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7);
private MutableUnsafeRow mutableRow = null;

private int currentRow = 0;
private int numRowsInBatch = 0;

private scala.collection.Iterator input = null;
private DataType[] columnTypes = null;
private int[] columnIndexes = null;

private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor;
private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor1;
private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor2;
private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor3;
private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor4;
private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor5;
private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor6;

public SpecificColumnarIterator() {
this.nativeOrder = ByteOrder.nativeOrder();
this.buffers = new byte[7][];
this.mutableRow = new MutableUnsafeRow(rowWriter);
}

public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) {
this.input = input;
this.columnTypes = columnTypes;
this.columnIndexes = columnIndexes;
}



public boolean hasNext() {
if (currentRow < numRowsInBatch) {
return true;
}
if (!input.hasNext()) {
return false;
}

org.apache.spark.sql.execution.columnar.CachedBatch batch = (org.apache.spark.sql.execution.columnar.CachedBatch) input.next();
currentRow = 0;
numRowsInBatch = batch.numRows();
for (int i = 0; i < columnIndexes.length; i ++) {
buffers[i] = batch.buffers()[columnIndexes[i]];
}
accessor = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder));
accessor1 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder));
accessor2 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder));
accessor3 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder));
accessor4 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder));
accessor5 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[5]).order(nativeOrder));
accessor6 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[6]).order(nativeOrder));

return hasNext();
}

public InternalRow next() {
currentRow += 1;
bufferHolder.reset();
rowWriter.zeroOutNullBytes();
accessor.extractTo(mutableRow, 0);
accessor1.extractTo(mutableRow, 1);
accessor2.extractTo(mutableRow, 2);
accessor3.extractTo(mutableRow, 3);
accessor4.extractTo(mutableRow, 4);
accessor5.extractTo(mutableRow, 5);
accessor6.extractTo(mutableRow, 6);
unsafeRow.setTotalSize(bufferHolder.totalSize());
return unsafeRow;
}
}
{noformat}

run-2:
{noformat}
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import scala.collection.Iterator;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;

public SpecificColumnarIterator generate(Object[] references) {
return new SpecificColumnarIterator();
}

class SpecificColumnarIterator extends org.apache.spark.sql.execution.columnar.ColumnarIterator {

private ByteOrder nativeOrder = null;
private byte[][] buffers = null;
private UnsafeRow unsafeRow = new UnsafeRow(7);
private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7);
private MutableUnsafeRow mutableRow = null;

private int currentRow = 0;
private int numRowsInBatch = 0;

private scala.collection.Iterator input = null;
private DataType[] columnTypes = null;
private int[] columnIndexes = null;

private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor;
private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor1;
private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor2;
private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor3;
private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor4;
private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor5;
private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor6;

public SpecificColumnarIterator() {
this.nativeOrder = ByteOrder.nativeOrder();
this.buffers = new byte[7][];
this.mutableRow = new MutableUnsafeRow(rowWriter);
}

public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) {
this.input = input;
this.columnTypes = columnTypes;
this.columnIndexes = columnIndexes;
}



public boolean hasNext() {
if (currentRow < numRowsInBatch) {
return true;
}
if (!input.hasNext()) {
return false;
}

org.apache.spark.sql.execution.columnar.CachedBatch batch = (org.apache.spark.sql.execution.columnar.CachedBatch) input.next();
currentRow = 0;
numRowsInBatch = batch.numRows();
for (int i = 0; i < columnIndexes.length; i ++) {
buffers[i] = batch.buffers()[columnIndexes[i]];
}
accessor = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder));
accessor1 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder));
accessor2 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder));
accessor3 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder));
accessor4 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder));
accessor5 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[5]).order(nativeOrder));
accessor6 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[6]).order(nativeOrder));

return hasNext();
}

public InternalRow next() {
currentRow += 1;
bufferHolder.reset();
rowWriter.zeroOutNullBytes();
accessor.extractTo(mutableRow, 0);
accessor1.extractTo(mutableRow, 1);
accessor2.extractTo(mutableRow, 2);
accessor3.extractTo(mutableRow, 3);
accessor4.extractTo(mutableRow, 4);
accessor5.extractTo(mutableRow, 5);
accessor6.extractTo(mutableRow, 6);
unsafeRow.setTotalSize(bufferHolder.totalSize());
return unsafeRow;
}
}
{noformat}

Diff-ing the two files reveals that the "accessor*" variable definitions are permuted. 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org