You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/27 07:27:29 UTC

spark git commit: [SPARK-21271][SQL] Ensure Unsafe.sizeInBytes is a multiple of 8

Repository: spark
Updated Branches:
  refs/heads/master cfb25b27c -> ebbe589d1


[SPARK-21271][SQL] Ensure Unsafe.sizeInBytes is a multiple of 8

## What changes were proposed in this pull request?

This PR ensures that `Unsafe.sizeInBytes` must be a multiple of 8. It it is not satisfied. `Unsafe.hashCode` causes the assertion violation.

## How was this patch tested?

Will add test cases

Author: Kazuaki Ishizaki <is...@jp.ibm.com>

Closes #18503 from kiszk/SPARK-21271.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ebbe589d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ebbe589d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ebbe589d

Branch: refs/heads/master
Commit: ebbe589d12434bc108672268bee05a7b7e571ee6
Parents: cfb25b2
Author: Kazuaki Ishizaki <is...@jp.ibm.com>
Authored: Thu Jul 27 15:27:24 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Jul 27 15:27:24 2017 +0800

----------------------------------------------------------------------
 .../expressions/FixedLengthRowBasedKeyValueBatch.java   |  6 +++---
 .../spark/sql/catalyst/expressions/UnsafeRow.java       |  2 ++
 .../VariableLengthRowBasedKeyValueBatch.java            |  6 +++---
 .../spark/sql/execution/UnsafeExternalRowSorter.java    |  7 ++++---
 .../spark/sql/execution/UnsafeKVExternalSorter.java     |  6 +++---
 .../streaming/state/HDFSBackedStateStoreProvider.scala  | 12 ++++++++++--
 6 files changed, 25 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ebbe589d/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java
index a88a315..df52f9c 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java
@@ -62,7 +62,7 @@ public final class FixedLengthRowBasedKeyValueBatch extends RowBasedKeyValueBatc
 
     keyRowId = numRows;
     keyRow.pointTo(base, recordOffset, klen);
-    valueRow.pointTo(base, recordOffset + klen, vlen + 4);
+    valueRow.pointTo(base, recordOffset + klen, vlen);
     numRows++;
     return valueRow;
   }
@@ -95,7 +95,7 @@ public final class FixedLengthRowBasedKeyValueBatch extends RowBasedKeyValueBatc
       getKeyRow(rowId);
     }
     assert(rowId >= 0);
-    valueRow.pointTo(base, keyRow.getBaseOffset() + klen, vlen + 4);
+    valueRow.pointTo(base, keyRow.getBaseOffset() + klen, vlen);
     return valueRow;
   }
 
@@ -131,7 +131,7 @@ public final class FixedLengthRowBasedKeyValueBatch extends RowBasedKeyValueBatc
         }
 
         key.pointTo(base, offsetInPage, klen);
-        value.pointTo(base, offsetInPage + klen, vlen + 4);
+        value.pointTo(base, offsetInPage + klen, vlen);
 
         offsetInPage += recordLength;
         recordsInPage -= 1;

http://git-wip-us.apache.org/repos/asf/spark/blob/ebbe589d/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 56994fa..ec947d7 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -167,6 +167,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo
    */
   public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) {
     assert numFields >= 0 : "numFields (" + numFields + ") should >= 0";
+    assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8";
     this.baseObject = baseObject;
     this.baseOffset = baseOffset;
     this.sizeInBytes = sizeInBytes;
@@ -183,6 +184,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo
   }
 
   public void setTotalSize(int sizeInBytes) {
+    assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8";
     this.sizeInBytes = sizeInBytes;
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ebbe589d/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java
index ea4f984..905e682 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java
@@ -65,7 +65,7 @@ public final class VariableLengthRowBasedKeyValueBatch extends RowBasedKeyValueB
 
     keyRowId = numRows;
     keyRow.pointTo(base, recordOffset + 8, klen);
-    valueRow.pointTo(base, recordOffset + 8 + klen, vlen + 4);
+    valueRow.pointTo(base, recordOffset + 8 + klen, vlen);
     numRows++;
     return valueRow;
   }
@@ -102,7 +102,7 @@ public final class VariableLengthRowBasedKeyValueBatch extends RowBasedKeyValueB
     long offset = keyRow.getBaseOffset();
     int klen = keyRow.getSizeInBytes();
     int vlen = Platform.getInt(base, offset - 8) - klen - 4;
-    valueRow.pointTo(base, offset + klen, vlen + 4);
+    valueRow.pointTo(base, offset + klen, vlen);
     return valueRow;
   }
 
@@ -146,7 +146,7 @@ public final class VariableLengthRowBasedKeyValueBatch extends RowBasedKeyValueB
         currentvlen = totalLength - currentklen;
 
         key.pointTo(base, offsetInPage + 8, currentklen);
-        value.pointTo(base, offsetInPage + 8 + currentklen, currentvlen + 4);
+        value.pointTo(base, offsetInPage + 8 + currentklen, currentvlen);
 
         offsetInPage += 8 + totalLength + 8;
         recordsInPage -= 1;

http://git-wip-us.apache.org/repos/asf/spark/blob/ebbe589d/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index aadfcaa..53b0886 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -208,9 +208,10 @@ public final class UnsafeExternalRowSorter {
 
     @Override
     public int compare(Object baseObj1, long baseOff1, Object baseObj2, long baseOff2) {
-      // TODO: Why are the sizes -1?
-      row1.pointTo(baseObj1, baseOff1, -1);
-      row2.pointTo(baseObj2, baseOff2, -1);
+      // Note that since ordering doesn't need the total length of the record, we just pass 0
+      // into the row.
+      row1.pointTo(baseObj1, baseOff1, 0);
+      row2.pointTo(baseObj2, baseOff2, 0);
       return ordering.compare(row1, row2);
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ebbe589d/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index ee5bcfd..d8acf11 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -238,10 +238,10 @@ public final class UnsafeKVExternalSorter {
 
     @Override
     public int compare(Object baseObj1, long baseOff1, Object baseObj2, long baseOff2) {
-      // Note that since ordering doesn't need the total length of the record, we just pass -1
+      // Note that since ordering doesn't need the total length of the record, we just pass 0
       // into the row.
-      row1.pointTo(baseObj1, baseOff1 + 4, -1);
-      row2.pointTo(baseObj2, baseOff2 + 4, -1);
+      row1.pointTo(baseObj1, baseOff1 + 4, 0);
+      row2.pointTo(baseObj2, baseOff2 + 4, 0);
       return ordering.compare(row1, row2);
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ebbe589d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index fa4c99c..e0c2e94 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -369,7 +369,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
             val valueRowBuffer = new Array[Byte](valueSize)
             ByteStreams.readFully(input, valueRowBuffer, 0, valueSize)
             val valueRow = new UnsafeRow(valueSchema.fields.length)
-            valueRow.pointTo(valueRowBuffer, valueSize)
+            // If valueSize in existing file is not multiple of 8, floor it to multiple of 8.
+            // This is a workaround for the following:
+            // Prior to Spark 2.3 mistakenly append 4 bytes to the value row in
+            // `RowBasedKeyValueBatch`, which gets persisted into the checkpoint data
+            valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8)
             map.put(keyRow, valueRow)
           }
         }
@@ -433,7 +437,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
             val valueRowBuffer = new Array[Byte](valueSize)
             ByteStreams.readFully(input, valueRowBuffer, 0, valueSize)
             val valueRow = new UnsafeRow(valueSchema.fields.length)
-            valueRow.pointTo(valueRowBuffer, valueSize)
+            // If valueSize in existing file is not multiple of 8, floor it to multiple of 8.
+            // This is a workaround for the following:
+            // Prior to Spark 2.3 mistakenly append 4 bytes to the value row in
+            // `RowBasedKeyValueBatch`, which gets persisted into the checkpoint data
+            valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8)
             map.put(keyRow, valueRow)
           }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org