You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/06 06:47:26 UTC
spark git commit: [SPARK-21312][SQL] correct offsetInBytes in
UnsafeRow.writeToStream
Repository: spark
Updated Branches:
refs/heads/master 75b168fd3 -> 14a3bb3a0
[SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream
## What changes were proposed in this pull request?
Corrects offsetInBytes calculation in UnsafeRow.writeToStream. Known failures include writes to some DataSources that have own SparkPlan implementations and cause EXCHANGE in writes.
## How was this patch tested?
Extended UnsafeRowSuite.writeToStream to include an UnsafeRow over byte array having non-zero offset.
Author: Sumedh Wale <sw...@snappydata.io>
Closes #18535 from sumwale/SPARK-21312.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14a3bb3a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14a3bb3a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14a3bb3a
Branch: refs/heads/master
Commit: 14a3bb3a008c302aac908d7deaf0942a98c63be7
Parents: 75b168f
Author: Sumedh Wale <sw...@snappydata.io>
Authored: Thu Jul 6 14:47:22 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Jul 6 14:47:22 2017 +0800
----------------------------------------------------------------------
.../spark/sql/catalyst/expressions/UnsafeRow.java | 2 +-
.../scala/org/apache/spark/sql/UnsafeRowSuite.scala | 13 +++++++++++++
2 files changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/14a3bb3a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 86de909..56994fa 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -550,7 +550,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo
*/
public void writeToStream(OutputStream out, byte[] writeBuffer) throws IOException {
if (baseObject instanceof byte[]) {
- int offsetInByteArray = (int) (Platform.BYTE_ARRAY_OFFSET - baseOffset);
+ int offsetInByteArray = (int) (baseOffset - Platform.BYTE_ARRAY_OFFSET);
out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes);
} else {
int dataRemaining = sizeInBytes;
http://git-wip-us.apache.org/repos/asf/spark/blob/14a3bb3a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
index a32763d..a5f904c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
@@ -101,9 +101,22 @@ class UnsafeRowSuite extends SparkFunSuite {
MemoryAllocator.UNSAFE.free(offheapRowPage)
}
}
+ val (bytesFromArrayBackedRowWithOffset, field0StringFromArrayBackedRowWithOffset) = {
+ val baos = new ByteArrayOutputStream()
+ val numBytes = arrayBackedUnsafeRow.getSizeInBytes
+ val bytesWithOffset = new Array[Byte](numBytes + 100)
+ System.arraycopy(arrayBackedUnsafeRow.getBaseObject.asInstanceOf[Array[Byte]], 0,
+ bytesWithOffset, 100, numBytes)
+ val arrayBackedRow = new UnsafeRow(arrayBackedUnsafeRow.numFields())
+ arrayBackedRow.pointTo(bytesWithOffset, Platform.BYTE_ARRAY_OFFSET + 100, numBytes)
+ arrayBackedRow.writeToStream(baos, null)
+ (baos.toByteArray, arrayBackedRow.getString(0))
+ }
assert(bytesFromArrayBackedRow === bytesFromOffheapRow)
assert(field0StringFromArrayBackedRow === field0StringFromOffheapRow)
+ assert(bytesFromArrayBackedRow === bytesFromArrayBackedRowWithOffset)
+ assert(field0StringFromArrayBackedRow === field0StringFromArrayBackedRowWithOffset)
}
test("calling getDouble() and getFloat() on null columns") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org