You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/27 07:27:29 UTC
spark git commit: [SPARK-21271][SQL] Ensure Unsafe.sizeInBytes is a
multiple of 8
Repository: spark
Updated Branches:
refs/heads/master cfb25b27c -> ebbe589d1
[SPARK-21271][SQL] Ensure Unsafe.sizeInBytes is a multiple of 8
## What changes were proposed in this pull request?
This PR ensures that `Unsafe.sizeInBytes` must be a multiple of 8. It it is not satisfied. `Unsafe.hashCode` causes the assertion violation.
## How was this patch tested?
Will add test cases
Author: Kazuaki Ishizaki <is...@jp.ibm.com>
Closes #18503 from kiszk/SPARK-21271.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ebbe589d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ebbe589d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ebbe589d
Branch: refs/heads/master
Commit: ebbe589d12434bc108672268bee05a7b7e571ee6
Parents: cfb25b2
Author: Kazuaki Ishizaki <is...@jp.ibm.com>
Authored: Thu Jul 27 15:27:24 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Jul 27 15:27:24 2017 +0800
----------------------------------------------------------------------
.../expressions/FixedLengthRowBasedKeyValueBatch.java | 6 +++---
.../spark/sql/catalyst/expressions/UnsafeRow.java | 2 ++
.../VariableLengthRowBasedKeyValueBatch.java | 6 +++---
.../spark/sql/execution/UnsafeExternalRowSorter.java | 7 ++++---
.../spark/sql/execution/UnsafeKVExternalSorter.java | 6 +++---
.../streaming/state/HDFSBackedStateStoreProvider.scala | 12 ++++++++++--
6 files changed, 25 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ebbe589d/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java
index a88a315..df52f9c 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java
@@ -62,7 +62,7 @@ public final class FixedLengthRowBasedKeyValueBatch extends RowBasedKeyValueBatc
keyRowId = numRows;
keyRow.pointTo(base, recordOffset, klen);
- valueRow.pointTo(base, recordOffset + klen, vlen + 4);
+ valueRow.pointTo(base, recordOffset + klen, vlen);
numRows++;
return valueRow;
}
@@ -95,7 +95,7 @@ public final class FixedLengthRowBasedKeyValueBatch extends RowBasedKeyValueBatc
getKeyRow(rowId);
}
assert(rowId >= 0);
- valueRow.pointTo(base, keyRow.getBaseOffset() + klen, vlen + 4);
+ valueRow.pointTo(base, keyRow.getBaseOffset() + klen, vlen);
return valueRow;
}
@@ -131,7 +131,7 @@ public final class FixedLengthRowBasedKeyValueBatch extends RowBasedKeyValueBatc
}
key.pointTo(base, offsetInPage, klen);
- value.pointTo(base, offsetInPage + klen, vlen + 4);
+ value.pointTo(base, offsetInPage + klen, vlen);
offsetInPage += recordLength;
recordsInPage -= 1;
http://git-wip-us.apache.org/repos/asf/spark/blob/ebbe589d/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 56994fa..ec947d7 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -167,6 +167,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo
*/
public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) {
assert numFields >= 0 : "numFields (" + numFields + ") should >= 0";
+ assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8";
this.baseObject = baseObject;
this.baseOffset = baseOffset;
this.sizeInBytes = sizeInBytes;
@@ -183,6 +184,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo
}
public void setTotalSize(int sizeInBytes) {
+ assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8";
this.sizeInBytes = sizeInBytes;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ebbe589d/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java
index ea4f984..905e682 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java
@@ -65,7 +65,7 @@ public final class VariableLengthRowBasedKeyValueBatch extends RowBasedKeyValueB
keyRowId = numRows;
keyRow.pointTo(base, recordOffset + 8, klen);
- valueRow.pointTo(base, recordOffset + 8 + klen, vlen + 4);
+ valueRow.pointTo(base, recordOffset + 8 + klen, vlen);
numRows++;
return valueRow;
}
@@ -102,7 +102,7 @@ public final class VariableLengthRowBasedKeyValueBatch extends RowBasedKeyValueB
long offset = keyRow.getBaseOffset();
int klen = keyRow.getSizeInBytes();
int vlen = Platform.getInt(base, offset - 8) - klen - 4;
- valueRow.pointTo(base, offset + klen, vlen + 4);
+ valueRow.pointTo(base, offset + klen, vlen);
return valueRow;
}
@@ -146,7 +146,7 @@ public final class VariableLengthRowBasedKeyValueBatch extends RowBasedKeyValueB
currentvlen = totalLength - currentklen;
key.pointTo(base, offsetInPage + 8, currentklen);
- value.pointTo(base, offsetInPage + 8 + currentklen, currentvlen + 4);
+ value.pointTo(base, offsetInPage + 8 + currentklen, currentvlen);
offsetInPage += 8 + totalLength + 8;
recordsInPage -= 1;
http://git-wip-us.apache.org/repos/asf/spark/blob/ebbe589d/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index aadfcaa..53b0886 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -208,9 +208,10 @@ public final class UnsafeExternalRowSorter {
@Override
public int compare(Object baseObj1, long baseOff1, Object baseObj2, long baseOff2) {
- // TODO: Why are the sizes -1?
- row1.pointTo(baseObj1, baseOff1, -1);
- row2.pointTo(baseObj2, baseOff2, -1);
+ // Note that since ordering doesn't need the total length of the record, we just pass 0
+ // into the row.
+ row1.pointTo(baseObj1, baseOff1, 0);
+ row2.pointTo(baseObj2, baseOff2, 0);
return ordering.compare(row1, row2);
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ebbe589d/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index ee5bcfd..d8acf11 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -238,10 +238,10 @@ public final class UnsafeKVExternalSorter {
@Override
public int compare(Object baseObj1, long baseOff1, Object baseObj2, long baseOff2) {
- // Note that since ordering doesn't need the total length of the record, we just pass -1
+ // Note that since ordering doesn't need the total length of the record, we just pass 0
// into the row.
- row1.pointTo(baseObj1, baseOff1 + 4, -1);
- row2.pointTo(baseObj2, baseOff2 + 4, -1);
+ row1.pointTo(baseObj1, baseOff1 + 4, 0);
+ row2.pointTo(baseObj2, baseOff2 + 4, 0);
return ordering.compare(row1, row2);
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ebbe589d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index fa4c99c..e0c2e94 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -369,7 +369,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
val valueRowBuffer = new Array[Byte](valueSize)
ByteStreams.readFully(input, valueRowBuffer, 0, valueSize)
val valueRow = new UnsafeRow(valueSchema.fields.length)
- valueRow.pointTo(valueRowBuffer, valueSize)
+ // If valueSize in existing file is not multiple of 8, floor it to multiple of 8.
+ // This is a workaround for the following:
+ // Prior to Spark 2.3 mistakenly append 4 bytes to the value row in
+ // `RowBasedKeyValueBatch`, which gets persisted into the checkpoint data
+ valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8)
map.put(keyRow, valueRow)
}
}
@@ -433,7 +437,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
val valueRowBuffer = new Array[Byte](valueSize)
ByteStreams.readFully(input, valueRowBuffer, 0, valueSize)
val valueRow = new UnsafeRow(valueSchema.fields.length)
- valueRow.pointTo(valueRowBuffer, valueSize)
+ // If valueSize in existing file is not multiple of 8, floor it to multiple of 8.
+ // This is a workaround for the following:
+ // Prior to Spark 2.3 mistakenly append 4 bytes to the value row in
+ // `RowBasedKeyValueBatch`, which gets persisted into the checkpoint data
+ valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8)
map.put(keyRow, valueRow)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org