You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/09/17 15:22:29 UTC

spark git commit: [SPARK-17529][CORE] Implement BitSet.clearUntil and use it during merge joins

Repository: spark
Updated Branches:
  refs/heads/master 25cbbe6ca -> 9dbd4b864


[SPARK-17529][CORE] Implement BitSet.clearUntil and use it during merge joins

## What changes were proposed in this pull request?

Add a clearUntil() method on BitSet (adapted from the pre-existing setUntil() method).
Use this method to clear the subset of the BitSet which needs to be used during merge joins.

## How was this patch tested?

dev/run-tests, as well as performance tests on skewed data as described in jira.

I expect there to be a small local performance hit using BitSet.clearUntil rather than BitSet.clear for normally shaped (unskewed) joins (additional read on the last long).  This is expected to be de-minimis and was not specifically tested.

Author: David Navas <da...@clearstorydata.com>

Closes #15084 from davidnavas/bitSet.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9dbd4b86
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9dbd4b86
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9dbd4b86

Branch: refs/heads/master
Commit: 9dbd4b864efacd09a8353d00c998be87f9eeacb2
Parents: 25cbbe6
Author: David Navas <da...@clearstorydata.com>
Authored: Sat Sep 17 16:22:23 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Sat Sep 17 16:22:23 2016 +0100

----------------------------------------------------------------------
 .../apache/spark/util/collection/BitSet.scala   | 28 +++++++++++------
 .../spark/util/collection/BitSetSuite.scala     | 32 ++++++++++++++++++++
 .../sql/execution/joins/SortMergeJoinExec.scala |  4 +--
 3 files changed, 52 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9dbd4b86/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
index 7ab67fc..e63e0e3 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.util.collection
 
+import java.util.Arrays
+
 /**
  * A simple, fixed-size bit set implementation. This implementation is fast because it avoids
  * safety/bound checking.
@@ -35,21 +37,14 @@ class BitSet(numBits: Int) extends Serializable {
   /**
    * Clear all set bits.
    */
-  def clear(): Unit = {
-    var i = 0
-    while (i < numWords) {
-      words(i) = 0L
-      i += 1
-    }
-  }
+  def clear(): Unit = Arrays.fill(words, 0)
 
   /**
    * Set all the bits up to a given index
    */
-  def setUntil(bitIndex: Int) {
+  def setUntil(bitIndex: Int): Unit = {
     val wordIndex = bitIndex >> 6 // divide by 64
-    var i = 0
-    while(i < wordIndex) { words(i) = -1; i += 1 }
+    Arrays.fill(words, 0, wordIndex, -1)
     if(wordIndex < words.length) {
       // Set the remaining bits (note that the mask could still be zero)
       val mask = ~(-1L << (bitIndex & 0x3f))
@@ -58,6 +53,19 @@ class BitSet(numBits: Int) extends Serializable {
   }
 
   /**
+   * Clear all the bits up to a given index
+   */
+  def clearUntil(bitIndex: Int): Unit = {
+    val wordIndex = bitIndex >> 6 // divide by 64
+    Arrays.fill(words, 0, wordIndex, 0)
+    if(wordIndex < words.length) {
+      // Clear the remaining bits
+      val mask = -1L << (bitIndex & 0x3f)
+      words(wordIndex) &= mask
+    }
+  }
+
+  /**
    * Compute the bit-wise AND of the two sets returning the
    * result.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/9dbd4b86/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
index 69dbfa9..0169c99 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
@@ -152,4 +152,36 @@ class BitSetSuite extends SparkFunSuite {
     assert(bitsetDiff.nextSetBit(85) === 85)
     assert(bitsetDiff.nextSetBit(86) === -1)
   }
+
+  test( "[gs]etUntil" ) {
+    val bitSet = new BitSet(100)
+
+    bitSet.setUntil(bitSet.capacity)
+
+    (0 until bitSet.capacity).foreach { i =>
+      assert(bitSet.get(i))
+    }
+
+    bitSet.clearUntil(bitSet.capacity)
+
+    (0 until bitSet.capacity).foreach { i =>
+      assert(!bitSet.get(i))
+    }
+
+    val setUntil = bitSet.capacity / 2
+    bitSet.setUntil(setUntil)
+
+    val clearUntil = setUntil / 2
+    bitSet.clearUntil(clearUntil)
+
+    (0 until clearUntil).foreach { i =>
+      assert(!bitSet.get(i))
+    }
+    (clearUntil until setUntil).foreach { i =>
+      assert(bitSet.get(i))
+    }
+    (setUntil until bitSet.capacity).foreach { i =>
+      assert(!bitSet.get(i))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9dbd4b86/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index b46af2a..81b3e1d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -954,12 +954,12 @@ private class SortMergeFullOuterJoinScanner(
     }
 
     if (leftMatches.size <= leftMatched.capacity) {
-      leftMatched.clear()
+      leftMatched.clearUntil(leftMatches.size)
     } else {
       leftMatched = new BitSet(leftMatches.size)
     }
     if (rightMatches.size <= rightMatched.capacity) {
-      rightMatched.clear()
+      rightMatched.clearUntil(rightMatches.size)
     } else {
       rightMatched = new BitSet(rightMatches.size)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org