You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2016/01/29 21:01:16 UTC

spark git commit: [SPARK-12818] Polishes spark-sketch module

Repository: spark
Updated Branches:
  refs/heads/master 5f686cc8b -> 2b027e9a3


[SPARK-12818] Polishes spark-sketch module

Fixes various minor code and Javadoc styling issues.

Author: Cheng Lian <li...@databricks.com>

Closes #10985 from liancheng/sketch-polishing.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b027e9a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b027e9a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b027e9a

Branch: refs/heads/master
Commit: 2b027e9a386fe4009f61ad03b169335af5a9a5c6
Parents: 5f686cc
Author: Cheng Lian <li...@databricks.com>
Authored: Fri Jan 29 12:01:13 2016 -0800
Committer: Cheng Lian <li...@databricks.com>
Committed: Fri Jan 29 12:01:13 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/util/sketch/BitArray.java  |   2 +-
 .../apache/spark/util/sketch/BloomFilter.java   | 111 ++++++++++---------
 .../spark/util/sketch/BloomFilterImpl.java      |  40 +++----
 .../spark/util/sketch/CountMinSketch.java       |  26 +++--
 .../spark/util/sketch/CountMinSketchImpl.java   |  12 ++
 .../org/apache/spark/util/sketch/Utils.java     |   2 +-
 6 files changed, 110 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2b027e9a/common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java
----------------------------------------------------------------------
diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java
index 2a0484e..480a0a7 100644
--- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java
+++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java
@@ -22,7 +22,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
 
-public final class BitArray {
+final class BitArray {
   private final long[] data;
   private long bitCount;
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b027e9a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java
----------------------------------------------------------------------
diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java
index 81772fc..c0b425e 100644
--- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java
+++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java
@@ -22,16 +22,10 @@ import java.io.InputStream;
 import java.io.OutputStream;
 
 /**
- * A Bloom filter is a space-efficient probabilistic data structure, that is used to test whether
- * an element is a member of a set. It returns false when the element is definitely not in the
- * set, returns true when the element is probably in the set.
- *
- * Internally a Bloom filter is initialized with 2 information: how many space to use(number of
- * bits) and how many hash values to calculate for each record.  To get as lower false positive
- * probability as possible, user should call {@link BloomFilter#create} to automatically pick a
- * best combination of these 2 parameters.
- *
- * Currently the following data types are supported:
+ * A Bloom filter is a space-efficient probabilistic data structure that offers an approximate
+ * containment test with one-sided error: if it claims that an item is contained in it, this
+ * might be in error, but if it claims that an item is <i>not</i> contained in it, then this is
+ * definitely true. Currently supported data types include:
  * <ul>
  *   <li>{@link Byte}</li>
  *   <li>{@link Short}</li>
@@ -39,14 +33,17 @@ import java.io.OutputStream;
  *   <li>{@link Long}</li>
  *   <li>{@link String}</li>
  * </ul>
+ * The false positive probability ({@code FPP}) of a Bloom filter is defined as the probability that
+ * {@linkplain #mightContain(Object)} will erroneously return {@code true} for an object that hasu
+ * not actually been put in the {@code BloomFilter}.
  *
- * The implementation is largely based on the {@code BloomFilter} class from guava.
+ * The implementation is largely based on the {@code BloomFilter} class from Guava.
  */
 public abstract class BloomFilter {
 
   public enum Version {
     /**
-     * {@code BloomFilter} binary format version 1 (all values written in big-endian order):
+     * {@code BloomFilter} binary format version 1. All values written in big-endian order:
      * <ul>
      *   <li>Version number, always 1 (32 bit)</li>
      *   <li>Number of hash functions (32 bit)</li>
@@ -68,14 +65,13 @@ public abstract class BloomFilter {
   }
 
   /**
-   * Returns the false positive probability, i.e. the probability that
-   * {@linkplain #mightContain(Object)} will erroneously return {@code true} for an object that
-   * has not actually been put in the {@code BloomFilter}.
+   * Returns the probability that {@linkplain #mightContain(Object)} erroneously return {@code true}
+   * for an object that has not actually been put in the {@code BloomFilter}.
    *
-   * <p>Ideally, this number should be close to the {@code fpp} parameter
-   * passed in to create this bloom filter, or smaller. If it is
-   * significantly higher, it is usually the case that too many elements (more than
-   * expected) have been put in the {@code BloomFilter}, degenerating it.
+   * Ideally, this number should be close to the {@code fpp} parameter passed in
+   * {@linkplain #create(long, double)}, or smaller. If it is significantly higher, it is usually
+   * the case that too many items (more than expected) have been put in the {@code BloomFilter},
+   * degenerating it.
    */
   public abstract double expectedFpp();
 
@@ -85,8 +81,8 @@ public abstract class BloomFilter {
   public abstract long bitSize();
 
   /**
-   * Puts an element into this {@code BloomFilter}. Ensures that subsequent invocations of
-   * {@link #mightContain(Object)} with the same element will always return {@code true}.
+   * Puts an item into this {@code BloomFilter}. Ensures that subsequent invocations of
+   * {@linkplain #mightContain(Object)} with the same item will always return {@code true}.
    *
    * @return true if the bloom filter's bits changed as a result of this operation. If the bits
    *     changed, this is <i>definitely</i> the first time {@code object} has been added to the
@@ -98,19 +94,19 @@ public abstract class BloomFilter {
   public abstract boolean put(Object item);
 
   /**
-   * A specialized variant of {@link #put(Object)}, that can only be used to put utf-8 string.
+   * A specialized variant of {@link #put(Object)} that only supports {@code String} items.
    */
-  public abstract boolean putString(String str);
+  public abstract boolean putString(String item);
 
   /**
-   * A specialized variant of {@link #put(Object)}, that can only be used to put long.
+   * A specialized variant of {@link #put(Object)} that only supports {@code long} items.
    */
-  public abstract boolean putLong(long l);
+  public abstract boolean putLong(long item);
 
   /**
-   * A specialized variant of {@link #put(Object)}, that can only be used to put byte array.
+   * A specialized variant of {@link #put(Object)} that only supports byte array items.
    */
-  public abstract boolean putBinary(byte[] bytes);
+  public abstract boolean putBinary(byte[] item);
 
   /**
    * Determines whether a given bloom filter is compatible with this bloom filter. For two
@@ -137,38 +133,36 @@ public abstract class BloomFilter {
   public abstract boolean mightContain(Object item);
 
   /**
-   * A specialized variant of {@link #mightContain(Object)}, that can only be used to test utf-8
-   * string.
+   * A specialized variant of {@link #mightContain(Object)} that only tests {@code String} items.
    */
-  public abstract boolean mightContainString(String str);
+  public abstract boolean mightContainString(String item);
 
   /**
-   * A specialized variant of {@link #mightContain(Object)}, that can only be used to test long.
+   * A specialized variant of {@link #mightContain(Object)} that only tests {@code long} items.
    */
-  public abstract boolean mightContainLong(long l);
+  public abstract boolean mightContainLong(long item);
 
   /**
-   * A specialized variant of {@link #mightContain(Object)}, that can only be used to test byte
-   * array.
+   * A specialized variant of {@link #mightContain(Object)} that only tests byte array items.
    */
-  public abstract boolean mightContainBinary(byte[] bytes);
+  public abstract boolean mightContainBinary(byte[] item);
 
   /**
-   * Writes out this {@link BloomFilter} to an output stream in binary format.
-   * It is the caller's responsibility to close the stream.
+   * Writes out this {@link BloomFilter} to an output stream in binary format. It is the caller's
+   * responsibility to close the stream.
    */
   public abstract void writeTo(OutputStream out) throws IOException;
 
   /**
-   * Reads in a {@link BloomFilter} from an input stream.
-   * It is the caller's responsibility to close the stream.
+   * Reads in a {@link BloomFilter} from an input stream. It is the caller's responsibility to close
+   * the stream.
    */
   public static BloomFilter readFrom(InputStream in) throws IOException {
     return BloomFilterImpl.readFrom(in);
   }
 
   /**
-   * Computes the optimal k (number of hashes per element inserted in Bloom filter), given the
+   * Computes the optimal k (number of hashes per item inserted in Bloom filter), given the
    * expected insertions and total number of bits in the Bloom filter.
    *
    * See http://en.wikipedia.org/wiki/File:Bloom_filter_fp_probability.svg for the formula.
@@ -197,21 +191,31 @@ public abstract class BloomFilter {
   static final double DEFAULT_FPP = 0.03;
 
   /**
-   * Creates a {@link BloomFilter} with given {@code expectedNumItems} and the default {@code fpp}.
+   * Creates a {@link BloomFilter} with the expected number of insertions and a default expected
+   * false positive probability of 3%.
+   *
+   * Note that overflowing a {@code BloomFilter} with significantly more elements than specified,
+   * will result in its saturation, and a sharp deterioration of its false positive probability.
    */
   public static BloomFilter create(long expectedNumItems) {
     return create(expectedNumItems, DEFAULT_FPP);
   }
 
   /**
-   * Creates a {@link BloomFilter} with given {@code expectedNumItems} and {@code fpp}, it will pick
-   * an optimal {@code numBits} and {@code numHashFunctions} for the bloom filter.
+   * Creates a {@link BloomFilter} with the expected number of insertions and expected false
+   * positive probability.
+   *
+   * Note that overflowing a {@code BloomFilter} with significantly more elements than specified,
+   * will result in its saturation, and a sharp deterioration of its false positive probability.
    */
   public static BloomFilter create(long expectedNumItems, double fpp) {
-    assert fpp > 0.0 : "False positive probability must be > 0.0";
-    assert fpp < 1.0 : "False positive probability must be < 1.0";
-    long numBits = optimalNumOfBits(expectedNumItems, fpp);
-    return create(expectedNumItems, numBits);
+    if (fpp <= 0D || fpp >= 1D) {
+      throw new IllegalArgumentException(
+        "False positive probability must be within range (0.0, 1.0)"
+      );
+    }
+
+    return create(expectedNumItems, optimalNumOfBits(expectedNumItems, fpp));
   }
 
   /**
@@ -219,9 +223,14 @@ public abstract class BloomFilter {
    * pick an optimal {@code numHashFunctions} which can minimize {@code fpp} for the bloom filter.
    */
   public static BloomFilter create(long expectedNumItems, long numBits) {
-    assert expectedNumItems > 0 : "Expected insertions must be > 0";
-    assert numBits > 0 : "number of bits must be > 0";
-    int numHashFunctions = optimalNumOfHashFunctions(expectedNumItems, numBits);
-    return new BloomFilterImpl(numHashFunctions, numBits);
+    if (expectedNumItems <= 0) {
+      throw new IllegalArgumentException("Expected insertions must be positive");
+    }
+
+    if (numBits <= 0) {
+      throw new IllegalArgumentException("Number of bits must be positive");
+    }
+
+    return new BloomFilterImpl(optimalNumOfHashFunctions(expectedNumItems, numBits), numBits);
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b027e9a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java
----------------------------------------------------------------------
diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java
index 35107e0..92c28bc 100644
--- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java
+++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java
@@ -19,9 +19,10 @@ package org.apache.spark.util.sketch;
 
 import java.io.*;
 
-public class BloomFilterImpl extends BloomFilter implements Serializable {
+class BloomFilterImpl extends BloomFilter implements Serializable {
 
   private int numHashFunctions;
+
   private BitArray bits;
 
   BloomFilterImpl(int numHashFunctions, long numBits) {
@@ -77,14 +78,14 @@ public class BloomFilterImpl extends BloomFilter implements Serializable {
   }
 
   @Override
-  public boolean putString(String str) {
-    return putBinary(Utils.getBytesFromUTF8String(str));
+  public boolean putString(String item) {
+    return putBinary(Utils.getBytesFromUTF8String(item));
   }
 
   @Override
-  public boolean putBinary(byte[] bytes) {
-    int h1 = Murmur3_x86_32.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, 0);
-    int h2 = Murmur3_x86_32.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, h1);
+  public boolean putBinary(byte[] item) {
+    int h1 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, 0);
+    int h2 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, h1);
 
     long bitSize = bits.bitSize();
     boolean bitsChanged = false;
@@ -100,14 +101,14 @@ public class BloomFilterImpl extends BloomFilter implements Serializable {
   }
 
   @Override
-  public boolean mightContainString(String str) {
-    return mightContainBinary(Utils.getBytesFromUTF8String(str));
+  public boolean mightContainString(String item) {
+    return mightContainBinary(Utils.getBytesFromUTF8String(item));
   }
 
   @Override
-  public boolean mightContainBinary(byte[] bytes) {
-    int h1 = Murmur3_x86_32.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, 0);
-    int h2 = Murmur3_x86_32.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, h1);
+  public boolean mightContainBinary(byte[] item) {
+    int h1 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, 0);
+    int h2 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, h1);
 
     long bitSize = bits.bitSize();
     for (int i = 1; i <= numHashFunctions; i++) {
@@ -124,14 +125,14 @@ public class BloomFilterImpl extends BloomFilter implements Serializable {
   }
 
   @Override
-  public boolean putLong(long l) {
+  public boolean putLong(long item) {
     // Here we first hash the input long element into 2 int hash values, h1 and h2, then produce n
     // hash values by `h1 + i * h2` with 1 <= i <= numHashFunctions.
     // Note that `CountMinSketch` use a different strategy, it hash the input long element with
     // every i to produce n hash values.
     // TODO: the strategy of `CountMinSketch` looks more advanced, should we follow it here?
-    int h1 = Murmur3_x86_32.hashLong(l, 0);
-    int h2 = Murmur3_x86_32.hashLong(l, h1);
+    int h1 = Murmur3_x86_32.hashLong(item, 0);
+    int h2 = Murmur3_x86_32.hashLong(item, h1);
 
     long bitSize = bits.bitSize();
     boolean bitsChanged = false;
@@ -147,9 +148,9 @@ public class BloomFilterImpl extends BloomFilter implements Serializable {
   }
 
   @Override
-  public boolean mightContainLong(long l) {
-    int h1 = Murmur3_x86_32.hashLong(l, 0);
-    int h2 = Murmur3_x86_32.hashLong(l, h1);
+  public boolean mightContainLong(long item) {
+    int h1 = Murmur3_x86_32.hashLong(item, 0);
+    int h2 = Murmur3_x86_32.hashLong(item, h1);
 
     long bitSize = bits.bitSize();
     for (int i = 1; i <= numHashFunctions; i++) {
@@ -197,7 +198,7 @@ public class BloomFilterImpl extends BloomFilter implements Serializable {
       throw new IncompatibleMergeException("Cannot merge null bloom filter");
     }
 
-    if (!(other instanceof BloomFilter)) {
+    if (!(other instanceof BloomFilterImpl)) {
       throw new IncompatibleMergeException(
         "Cannot merge bloom filter of class " + other.getClass().getName()
       );
@@ -211,7 +212,8 @@ public class BloomFilterImpl extends BloomFilter implements Serializable {
 
     if (this.numHashFunctions != that.numHashFunctions) {
       throw new IncompatibleMergeException(
-        "Cannot merge bloom filters with different number of hash functions");
+        "Cannot merge bloom filters with different number of hash functions"
+      );
     }
 
     this.bits.putAll(that.bits);

http://git-wip-us.apache.org/repos/asf/spark/blob/2b027e9a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java
----------------------------------------------------------------------
diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java
index f0aac5b..48f9868 100644
--- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java
+++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java
@@ -22,7 +22,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 
 /**
- * A Count-Min sketch is a probabilistic data structure used for summarizing streams of data in
+ * A Count-min sketch is a probabilistic data structure used for summarizing streams of data in
  * sub-linear space.  Currently, supported data types include:
  * <ul>
  *   <li>{@link Byte}</li>
@@ -31,8 +31,7 @@ import java.io.OutputStream;
  *   <li>{@link Long}</li>
  *   <li>{@link String}</li>
  * </ul>
- * Each {@link CountMinSketch} is initialized with a random seed, and a pair
- * of parameters:
+ * A {@link CountMinSketch} is initialized with a random seed, and a pair of parameters:
  * <ol>
  *   <li>relative error (or {@code eps}), and
  *   <li>confidence (or {@code delta})
@@ -49,16 +48,13 @@ import java.io.OutputStream;
  *   <li>{@code w = ceil(-log(1 - confidence) / log(2))}</li>
  * </ul>
  *
- * See http://www.eecs.harvard.edu/~michaelm/CS222/countmin.pdf for technical details,
- * including proofs of the estimates and error bounds used in this implementation.
- *
  * This implementation is largely based on the {@code CountMinSketch} class from stream-lib.
  */
 abstract public class CountMinSketch {
 
   public enum Version {
     /**
-     * {@code CountMinSketch} binary format version 1 (all values written in big-endian order):
+     * {@code CountMinSketch} binary format version 1.  All values written in big-endian order:
      * <ul>
      *   <li>Version number, always 1 (32 bit)</li>
      *   <li>Total count of added items (64 bit)</li>
@@ -172,14 +168,14 @@ abstract public class CountMinSketch {
       throws IncompatibleMergeException;
 
   /**
-   * Writes out this {@link CountMinSketch} to an output stream in binary format.
-   * It is the caller's responsibility to close the stream.
+   * Writes out this {@link CountMinSketch} to an output stream in binary format. It is the caller's
+   * responsibility to close the stream.
    */
   public abstract void writeTo(OutputStream out) throws IOException;
 
   /**
-   * Reads in a {@link CountMinSketch} from an input stream.
-   * It is the caller's responsibility to close the stream.
+   * Reads in a {@link CountMinSketch} from an input stream. It is the caller's responsibility to
+   * close the stream.
    */
   public static CountMinSketch readFrom(InputStream in) throws IOException {
     return CountMinSketchImpl.readFrom(in);
@@ -188,6 +184,10 @@ abstract public class CountMinSketch {
   /**
    * Creates a {@link CountMinSketch} with given {@code depth}, {@code width}, and random
    * {@code seed}.
+   *
+   * @param depth depth of the Count-min Sketch, must be positive
+   * @param width width of the Count-min Sketch, must be positive
+   * @param seed random seed
    */
   public static CountMinSketch create(int depth, int width, int seed) {
     return new CountMinSketchImpl(depth, width, seed);
@@ -196,6 +196,10 @@ abstract public class CountMinSketch {
   /**
    * Creates a {@link CountMinSketch} with given relative error ({@code eps}), {@code confidence},
    * and random {@code seed}.
+   *
+   * @param eps relative error, must be positive
+   * @param confidence confidence, must be positive and less than 1.0
+   * @param seed random seed
    */
   public static CountMinSketch create(double eps, double confidence, int seed) {
     return new CountMinSketchImpl(eps, confidence, seed);

http://git-wip-us.apache.org/repos/asf/spark/blob/2b027e9a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java
----------------------------------------------------------------------
diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java
index c0631c6..2acbb24 100644
--- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java
+++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java
@@ -42,6 +42,10 @@ class CountMinSketchImpl extends CountMinSketch implements Serializable {
   private CountMinSketchImpl() {}
 
   CountMinSketchImpl(int depth, int width, int seed) {
+    if (depth <= 0 || width <= 0) {
+      throw new IllegalArgumentException("Depth and width must be both positive");
+    }
+
     this.depth = depth;
     this.width = width;
     this.eps = 2.0 / width;
@@ -50,6 +54,14 @@ class CountMinSketchImpl extends CountMinSketch implements Serializable {
   }
 
   CountMinSketchImpl(double eps, double confidence, int seed) {
+    if (eps <= 0D) {
+      throw new IllegalArgumentException("Relative error must be positive");
+    }
+
+    if (confidence <= 0D || confidence >= 1D) {
+      throw new IllegalArgumentException("Confidence must be within range (0.0, 1.0)");
+    }
+
     // 2/w = eps ; w = 2/eps
     // 1/2^depth <= 1-confidence ; depth >= -log2 (1-confidence)
     this.eps = eps;

http://git-wip-us.apache.org/repos/asf/spark/blob/2b027e9a/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java
----------------------------------------------------------------------
diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java
index a6b3331..feb601d 100644
--- a/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java
+++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java
@@ -19,7 +19,7 @@ package org.apache.spark.util.sketch;
 
 import java.io.UnsupportedEncodingException;
 
-public class Utils {
+class Utils {
   public static byte[] getBytesFromUTF8String(String str) {
     try {
       return str.getBytes("utf-8");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org