You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Takeshi Yamamuro (JIRA)" <ji...@apache.org> on 2017/08/16 03:17:00 UTC

[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses

    [ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16128243#comment-16128243 ] 

Takeshi Yamamuro commented on SPARK-18394:
------------------------------------------

Any update? I checked and I found the master still has this issue; I just run the query above and dump output names in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala#L102.
{code}
17/08/16 02:13:13 WARN BaseSessionStateBuilder$$anon$3: L_QUANTITY#9015,L_RETURNFLAG#9019,l_shipdate#9021,L_TAX#9018,L_DISCOUNT#9017,L_LINESTATUS#9020,L_EXTENDEDPRICE#9016
17/08/16 02:13:13 WARN BaseSessionStateBuilder$$anon$3: L_RETURNFLAG#9142,L_DISCOUNT#9140,L_EXTENDEDPRICE#9139,L_QUANTITY#9138,L_LINESTATUS#9143,l_shipdate#9144,L_TAX#9141
17/08/16 02:13:13 WARN BaseSessionStateBuilder$$anon$3: L_QUANTITY#9305,L_TAX#9308,l_shipdate#9311,L_DISCOUNT#9307,L_RETURNFLAG#9309,L_LINESTATUS#9310,L_EXTENDEDPRICE#9306
17/08/16 02:13:13 WARN BaseSessionStateBuilder$$anon$3: L_EXTENDEDPRICE#9451,L_QUANTITY#9450,L_RETURNFLAG#9454,L_TAX#9453,L_DISCOUNT#9452,l_shipdate#9456,L_LINESTATUS#9455
17/08/16 02:13:13 WARN BaseSessionStateBuilder$$anon$3: L_LINESTATUS#9600,l_shipdate#9601,L_DISCOUNT#9597,L_TAX#9598,L_EXTENDEDPRICE#9596,L_RETURNFLAG#9599,L_QUANTITY#9595
17/08/16 02:13:13 WARN BaseSessionStateBuilder$$anon$3: L_QUANTITY#9740,L_TAX#9743,l_shipdate#9746,L_DISCOUNT#9742,L_EXTENDEDPRICE#9741,L_LINESTATUS#9745,L_RETURNFLAG#9744
...
{code}
The attribute order is different, and then Spark generates different  code in `GenerateColumnAccessor`.
Also, I quickly checked `AttributeSet.toSeq` output attributes with a different order;
{code}
scala> val attr1 = AttributeReference("c1", IntegerType)(exprId = ExprId(1098))
scala> val attr2 = AttributeReference("c2", IntegerType)(exprId = ExprId(107))
scala> val attr3 = AttributeReference("c3", IntegerType)(exprId = ExprId(838))
scala> val attrSetA = AttributeSet(attr1 :: attr2 :: attr3 :: Nil)
scala> val attr4 = AttributeReference("c4", IntegerType)(exprId = ExprId(389))
scala> val attr5 = AttributeReference("c5", IntegerType)(exprId = ExprId(89329))
scala> val attrSetB = AttributeSet(attr4 :: attr5 :: Nil)
scala> (attrSetA ++ attrSetB).toSeq
res1: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = WrappedArray(c3#838, c4#389, c2#107, c5#89329, c1#1098)

scala> val attr1 = AttributeReference("c1", IntegerType)(exprId = ExprId(392))
scala> val attr2 = AttributeReference("c2", IntegerType)(exprId = ExprId(92))
scala> val attr3 = AttributeReference("c3", IntegerType)(exprId = ExprId(87))
scala> val attrSetA = AttributeSet(attr1 :: attr2 :: attr3 :: Nil)
scala> val attr4 = AttributeReference("c4", IntegerType)(exprId = ExprId(9023920))
scala> val attr5 = AttributeReference("c5", IntegerType)(exprId = ExprId(522))
scala> val attrSetB = AttributeSet(attr4 :: attr5 :: Nil)
scala> (attrSetA ++ attrSetB).toSeq
res2: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = WrappedArray(c3#87, c1#392, c5#522, c2#92, c4#9023920)
{code}

As suggested, to fix this, `Attribute.toSeq` need to output attributes with a consistent order like;
https://github.com/apache/spark/compare/master...maropu:SPARK-18394
 

> Executing the same query twice in a row results in CodeGenerator cache misses
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-18394
>                 URL: https://issues.apache.org/jira/browse/SPARK-18394
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>         Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop
>            Reporter: Jonny Serencsa
>
> Executing the query:
> {noformat}
> select
>     l_returnflag,
>     l_linestatus,
>     sum(l_quantity) as sum_qty,
>     sum(l_extendedprice) as sum_base_price,
>     sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
>     sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
>     avg(l_quantity) as avg_qty,
>     avg(l_extendedprice) as avg_price,
>     avg(l_discount) as avg_disc,
>     count(*) as count_order
> from
>     lineitem_1_row
> where
>     l_shipdate <= date_sub('1998-12-01', '90')
> group by
>     l_returnflag,
>     l_linestatus
> ;
> {noformat}
> twice (in succession), will result in CodeGenerator cache misses in BOTH executions. Since the query is identical, I would expect the same code to be generated. 
> Turns out, the generated code is not exactly the same, resulting in cache misses when performing the lookup in the CodeGenerator cache. Yet, the code is equivalent. 
> Below is (some portion of the) generated code for two runs of the query:
> run-1
> {noformat}
> import java.nio.ByteBuffer;
> import java.nio.ByteOrder;
> import scala.collection.Iterator;
> import org.apache.spark.sql.types.DataType;
> import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
> import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
> import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;
> public SpecificColumnarIterator generate(Object[] references) {
> return new SpecificColumnarIterator();
> }
> class SpecificColumnarIterator extends org.apache.spark.sql.execution.columnar.ColumnarIterator {
> private ByteOrder nativeOrder = null;
> private byte[][] buffers = null;
> private UnsafeRow unsafeRow = new UnsafeRow(7);
> private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
> private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7);
> private MutableUnsafeRow mutableRow = null;
> private int currentRow = 0;
> private int numRowsInBatch = 0;
> private scala.collection.Iterator input = null;
> private DataType[] columnTypes = null;
> private int[] columnIndexes = null;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor1;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor2;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor3;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor4;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor5;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor6;
> public SpecificColumnarIterator() {
> this.nativeOrder = ByteOrder.nativeOrder();
> this.buffers = new byte[7][];
> this.mutableRow = new MutableUnsafeRow(rowWriter);
> }
> public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) {
> this.input = input;
> this.columnTypes = columnTypes;
> this.columnIndexes = columnIndexes;
> }
> public boolean hasNext() {
> if (currentRow < numRowsInBatch) {
> return true;
> }
> if (!input.hasNext()) {
> return false;
> }
> org.apache.spark.sql.execution.columnar.CachedBatch batch = (org.apache.spark.sql.execution.columnar.CachedBatch) input.next();
> currentRow = 0;
> numRowsInBatch = batch.numRows();
> for (int i = 0; i < columnIndexes.length; i ++) {
> buffers[i] = batch.buffers()[columnIndexes[i]];
> }
> accessor = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder));
> accessor1 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder));
> accessor2 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder));
> accessor3 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder));
> accessor4 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder));
> accessor5 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[5]).order(nativeOrder));
> accessor6 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[6]).order(nativeOrder));
> return hasNext();
> }
> public InternalRow next() {
> currentRow += 1;
> bufferHolder.reset();
> rowWriter.zeroOutNullBytes();
> accessor.extractTo(mutableRow, 0);
> accessor1.extractTo(mutableRow, 1);
> accessor2.extractTo(mutableRow, 2);
> accessor3.extractTo(mutableRow, 3);
> accessor4.extractTo(mutableRow, 4);
> accessor5.extractTo(mutableRow, 5);
> accessor6.extractTo(mutableRow, 6);
> unsafeRow.setTotalSize(bufferHolder.totalSize());
> return unsafeRow;
> }
> }
> {noformat}
> run-2:
> {noformat}
> import java.nio.ByteBuffer;
> import java.nio.ByteOrder;
> import scala.collection.Iterator;
> import org.apache.spark.sql.types.DataType;
> import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
> import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
> import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;
> public SpecificColumnarIterator generate(Object[] references) {
> return new SpecificColumnarIterator();
> }
> class SpecificColumnarIterator extends org.apache.spark.sql.execution.columnar.ColumnarIterator {
> private ByteOrder nativeOrder = null;
> private byte[][] buffers = null;
> private UnsafeRow unsafeRow = new UnsafeRow(7);
> private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
> private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7);
> private MutableUnsafeRow mutableRow = null;
> private int currentRow = 0;
> private int numRowsInBatch = 0;
> private scala.collection.Iterator input = null;
> private DataType[] columnTypes = null;
> private int[] columnIndexes = null;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor1;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor2;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor3;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor4;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor5;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor6;
> public SpecificColumnarIterator() {
> this.nativeOrder = ByteOrder.nativeOrder();
> this.buffers = new byte[7][];
> this.mutableRow = new MutableUnsafeRow(rowWriter);
> }
> public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) {
> this.input = input;
> this.columnTypes = columnTypes;
> this.columnIndexes = columnIndexes;
> }
> public boolean hasNext() {
> if (currentRow < numRowsInBatch) {
> return true;
> }
> if (!input.hasNext()) {
> return false;
> }
> org.apache.spark.sql.execution.columnar.CachedBatch batch = (org.apache.spark.sql.execution.columnar.CachedBatch) input.next();
> currentRow = 0;
> numRowsInBatch = batch.numRows();
> for (int i = 0; i < columnIndexes.length; i ++) {
> buffers[i] = batch.buffers()[columnIndexes[i]];
> }
> accessor = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder));
> accessor1 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder));
> accessor2 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder));
> accessor3 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder));
> accessor4 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder));
> accessor5 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[5]).order(nativeOrder));
> accessor6 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[6]).order(nativeOrder));
> return hasNext();
> }
> public InternalRow next() {
> currentRow += 1;
> bufferHolder.reset();
> rowWriter.zeroOutNullBytes();
> accessor.extractTo(mutableRow, 0);
> accessor1.extractTo(mutableRow, 1);
> accessor2.extractTo(mutableRow, 2);
> accessor3.extractTo(mutableRow, 3);
> accessor4.extractTo(mutableRow, 4);
> accessor5.extractTo(mutableRow, 5);
> accessor6.extractTo(mutableRow, 6);
> unsafeRow.setTotalSize(bufferHolder.totalSize());
> return unsafeRow;
> }
> }
> {noformat}
> Diff-ing the two files reveals that the "accessor*" variable definitions are permuted. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org