You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2011/11/03 22:44:26 UTC

svn commit: r1197318 - in /pig/branches/branch-0.10: ./ src/org/apache/pig/builtin/ test/e2e/pig/tests/ test/org/apache/pig/test/

Author: gates
Date: Thu Nov  3 21:44:25 2011
New Revision: 1197318

URL: http://svn.apache.org/viewvc?rev=1197318&view=rev
Log:
PIG-2328: Add builtin UDFs for building and using bloom filters

Added:
    pig/branches/branch-0.10/src/org/apache/pig/builtin/Bloom.java
    pig/branches/branch-0.10/src/org/apache/pig/builtin/BuildBloom.java
    pig/branches/branch-0.10/src/org/apache/pig/builtin/BuildBloomBase.java
    pig/branches/branch-0.10/test/org/apache/pig/test/TestBloom.java
Modified:
    pig/branches/branch-0.10/CHANGES.txt
    pig/branches/branch-0.10/test/e2e/pig/tests/nightly.conf

Modified: pig/branches/branch-0.10/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/CHANGES.txt?rev=1197318&r1=1197317&r2=1197318&view=diff
==============================================================================
--- pig/branches/branch-0.10/CHANGES.txt (original)
+++ pig/branches/branch-0.10/CHANGES.txt Thu Nov  3 21:44:25 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2328: Add builtin UDFs for building and using bloom filters (gates)
+
 PIG-2334: Set default number of reducers for S3N filesystem (ddaniels888 via daijy)
 
 PIG-1387: Syntactical Sugar for PIG-1385 (azaroth)

Added: pig/branches/branch-0.10/src/org/apache/pig/builtin/Bloom.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/builtin/Bloom.java?rev=1197318&view=auto
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/builtin/Bloom.java (added)
+++ pig/branches/branch-0.10/src/org/apache/pig/builtin/Bloom.java Thu Nov  3 21:44:25 2011
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.builtin;
+
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+
+import org.apache.pig.FilterFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Use a Bloom filter build previously by BuildBloom.  You would first
+ * build a bloom filter in a group all job.  For example:
+ * in a group all job.  For example:
+ * define bb BuildBloom('jenkins', '100', '0.1');
+ * A = load 'foo' as (x, y);
+ * B = group A all;
+ * C = foreach B generate bb(A.x);
+ * store C into 'mybloom';
+ * The bloom filter can be on multiple keys by passing more than one field
+ * (or the entire bag) to BuildBloom.
+ * The resulting file can then be used in a Bloom filter as:
+ * define bloom Bloom(mybloom);
+ * A = load 'foo' as (x, y);
+ * B = load 'bar' as (z);
+ * C = filter B by bloom(z);
+ * D = join C by z, A by x;
+ * It uses {@link org.apache.hadoop.util.bloom.BloomFilter}.
+ */
+public class Bloom extends FilterFunc {
+
+    private String bloomFile;
+    public BloomFilter filter = null;
+
+    /** 
+     * @param filename file containing the serialized Bloom filter
+     */
+    public Bloom(String filename) {
+        bloomFile = filename;
+    }
+
+    @Override
+    public Boolean exec(Tuple input) throws IOException {
+        if (filter == null) {
+            init();
+        }
+        byte[] b;
+        if (input.size() == 1) b = DataType.toBytes(input.get(0));
+        else b = DataType.toBytes(input, DataType.TUPLE);
+
+        Key k = new Key(b);
+        return filter.membershipTest(k);
+    }
+
+    @Override
+    public List<String> getCacheFiles() {
+        List<String> list = new ArrayList<String>(1);
+        // We were passed the name of the file on HDFS.  Append a
+        // name for the file on the task node.
+        try {
+            list.add(bloomFile + "#" + getFilenameFromPath(bloomFile));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return list;
+    }
+
+    private void init() throws IOException {
+        filter = new BloomFilter();
+        String dcFile = "./" + getFilenameFromPath(bloomFile) +
+            "/part-r-00000";
+        filter.readFields(new DataInputStream(new FileInputStream(dcFile)));
+    }
+
+    /**
+     * For testing only, do not use directly.
+     */
+    public void setFilter(DataByteArray dba) throws IOException {
+        DataInputStream dis = new DataInputStream(new
+            ByteArrayInputStream(dba.get()));
+        filter = new BloomFilter();
+        filter.readFields(dis);
+    }
+
+    private String getFilenameFromPath(String p) throws IOException {
+        return p.replace("/", "_");
+    }
+
+}

Added: pig/branches/branch-0.10/src/org/apache/pig/builtin/BuildBloom.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/builtin/BuildBloom.java?rev=1197318&view=auto
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/builtin/BuildBloom.java (added)
+++ pig/branches/branch-0.10/src/org/apache/pig/builtin/BuildBloom.java Thu Nov  3 21:44:25 2011
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Build a bloom filter for use later in Bloom.  This UDF is intended to run
+ * in a group all job.  For example:
+ * define bb BuildBloom('jenkins', '100', '0.1');
+ * A = load 'foo' as (x, y);
+ * B = group A all;
+ * C = foreach B generate BuildBloom(A.x);
+ * store C into 'mybloom';
+ * The bloom filter can be on multiple keys by passing more than one field
+ * (or the entire bag) to BuildBloom.
+ * The resulting file can then be used in a Bloom filter as:
+ * define bloom Bloom(mybloom);
+ * A = load 'foo' as (x, y);
+ * B = load 'bar' as (z);
+ * C = filter B by Bloom(z);
+ * D = join C by z, A by x;
+ * It uses {@link org.apache.hadoop.util.bloom.BloomFilter}.
+ */
+public class BuildBloom extends BuildBloomBase<DataByteArray> implements Algebraic {
+
+    /** 
+     * Build a bloom filter of fixed size and number of hash functions.
+     * @param hashType type of the hashing function (see
+     * {@link org.apache.hadoop.util.bloom.Hash}).
+     * @param mode Will be ignored, though by convention it should be
+     * "fixed" or "fixedsize"
+     * @param vectorSize The vector size of this filter.
+     * @param nbHash The number of hash functions to consider.
+     */
+    public BuildBloom(String hashType,
+                      String mode,
+                      String vectorSize,
+                      String nbHash) {
+        super(hashType, mode, vectorSize, nbHash);
+    }
+
+    /** 
+     * Construct a Bloom filter based on expected number of elements and
+     * desired accuracy.
+     * @param hashType type of the hashing function (see
+     * {@link org.apache.hadoop.util.bloom.Hash}).
+     * @param numElements The number of distinct elements expected to be
+     * placed in this filter.
+     * @param desiredFalsePositive the acceptable rate of false positives.
+     * This should be a floating point value between 0 and 1.0, where 1.0
+     * would be 100% (ie, a totally useless filter).
+     */
+    public BuildBloom(String hashType,
+                      String numElements,
+                      String desiredFalsePositive) {
+        super(hashType, numElements, desiredFalsePositive);
+    }
+
+    @Override
+    public DataByteArray exec(Tuple input) throws IOException {
+        throw new IOException("This must be used with algebraic!");
+    }
+
+    public String getInitial() {
+        return Initial.class.getName();
+    }
+
+    public String getIntermed() {
+        return Intermediate.class.getName();
+    }
+
+    public String getFinal() {
+        return Final.class.getName();
+    }
+
+    static public class Initial extends BuildBloomBase<Tuple> {
+
+        public Initial() {
+        }
+
+        public Initial(String hashType,
+                       String mode,
+                       String vectorSize,
+                       String nbHash ) {
+            super(hashType, mode, vectorSize, nbHash);
+        }
+
+        public Initial(String hashType,
+                       String numElements,
+                       String desiredFalsePositive) {
+            super(hashType, numElements, desiredFalsePositive);
+        }
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            if (input == null || input.size() == 0) return null;
+
+            // Strip off the initial level of bag
+            DataBag values = (DataBag)input.get(0);
+            Iterator<Tuple> it = values.iterator();
+            Tuple t = it.next();
+
+            // If the input tuple has only one field, then we'll extract
+            // that field and serialize it into a key.  If it has multiple
+            // fields, we'll serialize the whole tuple.
+            byte[] b;
+            if (t.size() == 1) b = DataType.toBytes(t.get(0));
+            else b = DataType.toBytes(t, DataType.TUPLE);
+
+            Key k = new Key(b);
+            filter = new BloomFilter(vSize, numHash, hType);
+            filter.add(k);
+
+            return TupleFactory.getInstance().newTuple(bloomOut());
+        }
+    }
+
+    static public class Intermediate extends BuildBloomBase<Tuple> {
+
+        public Intermediate() {
+        }
+
+        public Intermediate(String hashType,
+                            String mode,
+                            String vectorSize,
+                            String nbHash ) {
+            super(hashType, mode, vectorSize, nbHash);
+        }
+
+        public Intermediate(String hashType,
+                            String numElements,
+                            String desiredFalsePositive) {
+            super(hashType, numElements, desiredFalsePositive);
+        }
+
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            return TupleFactory.getInstance().newTuple(bloomOr(input));
+        }
+    }
+
+    static public class Final extends BuildBloomBase<DataByteArray> {
+
+        public Final() {
+        }
+
+        public Final(String hashType,
+                     String mode,
+                     String vectorSize,
+                     String nbHash ) {
+            super(hashType, mode, vectorSize, nbHash);
+        }
+
+        public Final(String hashType,
+                     String numElements,
+                     String desiredFalsePositive) {
+            super(hashType, numElements, desiredFalsePositive);
+        }
+
+        @Override
+        public DataByteArray exec(Tuple input) throws IOException {
+            return bloomOr(input);
+        }
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        return new Schema(new Schema.FieldSchema(null, DataType.BYTEARRAY)); 
+    }
+
+}

Added: pig/branches/branch-0.10/src/org/apache/pig/builtin/BuildBloomBase.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/builtin/BuildBloomBase.java?rev=1197318&view=auto
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/builtin/BuildBloomBase.java (added)
+++ pig/branches/branch-0.10/src/org/apache/pig/builtin/BuildBloomBase.java Thu Nov  3 21:44:25 2011
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.Iterator;
+
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.hash.Hash;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+/**
+ * A Base class for BuildBloom and its Algebraic implementations.
+ */
+public abstract class BuildBloomBase<T> extends EvalFunc<T> {
+
+    protected int vSize;
+    protected int numHash;
+    protected int hType;
+    protected BloomFilter filter;
+
+    protected BuildBloomBase() {
+    }
+
+    /** 
+     * @param hashType type of the hashing function (see
+     * {@link org.apache.hadoop.util.bloom.Hash}).
+     * @param mode Will be ignored, though by convention it should be
+     * "fixed" or "fixedsize"
+     * @param vectorSize The vector size of <i>this</i> filter.
+     * @param nbHash The number of hash functions to consider.
+     */
+    public BuildBloomBase(String hashType,
+                          String mode,
+                          String vectorSize,
+                          String nbHash) {
+        vSize = Integer.valueOf(vectorSize);
+        numHash = Integer.valueOf(nbHash);
+        hType = convertHashType(hashType);
+    }
+
+    /** 
+     * @param hashType type of the hashing function (see
+     * {@link org.apache.hadoop.util.bloom.Hash}).
+     * @param numElements The number of distinct elements expected to be
+     * placed in this filter.
+     * @param desiredFalsePositive the acceptable rate of false positives.
+     * This should be a floating point value between 0 and 1.0, where 1.0
+     * would be 100% (ie, a totally useless filter).
+     */
+    public BuildBloomBase(String hashType,
+                          String numElements,
+                          String desiredFalsePositive) {
+        setSize(numElements, desiredFalsePositive);
+        hType = convertHashType(hashType);
+    }
+
+
+    protected DataByteArray bloomOr(Tuple input) throws IOException {
+        filter = new BloomFilter(vSize, numHash, hType);
+
+        try {
+            DataBag values = (DataBag)input.get(0);
+            for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+                Tuple t = it.next();
+                filter.or(bloomIn((DataByteArray)t.get(0)));
+            }
+        } catch (ExecException ee) {
+            throw new IOException(ee);
+        }
+
+        return bloomOut();
+    }
+
+    protected DataByteArray bloomOut() throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(vSize / 8);
+        DataOutputStream dos = new DataOutputStream(baos);
+        filter.write(dos);
+        return new DataByteArray(baos.toByteArray());
+    }
+
+    protected BloomFilter bloomIn(DataByteArray b) throws IOException {
+        DataInputStream dis = new DataInputStream(new
+            ByteArrayInputStream(b.get()));
+        BloomFilter f = new BloomFilter();
+        f.readFields(dis);
+        return f;
+    }
+
+    private int convertHashType(String hashType) {
+        if (hashType.toLowerCase().contains("jenkins")) {
+            return Hash.JENKINS_HASH;
+        } else if (hashType.toLowerCase().contains("murmur")) {
+            return Hash.MURMUR_HASH;
+        } else {
+            throw new RuntimeException("Unknown hash type " + hashType +
+                ".  Valid values are jenkins and murmur.");
+        }
+    }
+
+    private void setSize(String numElements, String desiredFalsePositive) {
+        int num = Integer.valueOf(numElements);
+        float fp = Float.valueOf(desiredFalsePositive);
+        if (num < 1 || fp < 0.0 || fp >= 1.0) {
+            throw new RuntimeException("Number of elements must be greater "
+                + "than zero and desiredFalsePositive must be between 0 "
+                + " and 1.");
+        }
+        vSize = (int)(-1 * (num * Math.log(fp)) / Math.pow(Math.log(2), 2));
+        log.info("BuildBloom setting vector size to " + vSize);
+
+        numHash = (int)(0.7 * vSize / num);
+        log.info("BuildBloom setting number of hashes to " + numHash);
+    }
+
+
+}

Modified: pig/branches/branch-0.10/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/test/e2e/pig/tests/nightly.conf?rev=1197318&r1=1197317&r2=1197318&view=diff
==============================================================================
--- pig/branches/branch-0.10/test/e2e/pig/tests/nightly.conf (original)
+++ pig/branches/branch-0.10/test/e2e/pig/tests/nightly.conf Thu Nov  3 21:44:25 2011
@@ -3882,7 +3882,78 @@ store E into ':OUTPATH:';\, 
                                 store D into ':OUTPATH:';?,
                     }
                 ],
-            },
+            },{
+                'name' => 'Bloom',
+ 			    'execonly' => 'mapred', # distributed cache does not work in local mode
+                 'tests' => [
+                     {
+                         'num' => 1,
+                         'pig' => "define bb BuildBloom('Hash.JENKINS_HASH', 'fixed', '128', '3');
+                                 fs -rmr :TMP:/mybloom_1;
+                                 A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double);
+                                 B = filter A by name == 'alice allen';
+                                 C = group B all;
+                                 D = foreach C generate bb(B.name);
+                                 store D into ':TMP:/mybloom_1';
+                                 exec;
+                                 define bloom Bloom(':TMP:/mybloom_1');
+                                 E = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double);
+                                 F = filter E by bloom(name);
+                                 store F into ':OUTPATH:';",
+                         'notmq' => 1,
+                         'verify_pig_script' => "
+                                 A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:int ,gpa:double);
+                                 B = filter A by name == 'alice allen';
+                                 store B into ':OUTPATH:';",
+                     }, {
+                         'num' => 2,
+                         'pig' => "define bb BuildBloom('Hash.MURMUR_HASH', 'fixed', '128', '3');
+                                 fs -rmr :TMP:/mybloom_2;
+                                 A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double);
+                                 B = filter A by name == 'alice allen';
+                                 C = group B all;
+                                 D = foreach C generate bb(B.name);
+                                 store D into ':TMP:/mybloom_2';
+                                 exec;
+                                 define bloom Bloom(':TMP:/mybloom_2');
+                                 E = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double);
+                                 F = filter E by bloom(name);
+                                 G = load ':INPATH:/singlefile/votertab10k'as (name:chararray, age:int, reg:chararray, contrib:float);
+                                 H = join F by name, G by name;
+                                 store H into ':OUTPATH:';",
+                         'notmq' => 1,
+                         'verify_pig_script' => "
+                                 A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:int ,gpa:double);
+                                 B = filter A by name == 'alice allen';
+                                 C = load ':INPATH:/singlefile/votertab10k'as (name:chararray, age:int, reg:chararray, contrib:float);
+                                 D = join B by name, C by name;
+                                 store D into ':OUTPATH:';",
+                     },{
+                         'num' => 3,
+                         'pig' => "define bb BuildBloom('Hash.JENKINS_HASH', '1', '0.0001');
+                                 fs -rmr :TMP:/mybloom_3;
+                                 A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double);
+                                 B = filter A by name == 'alice allen';
+                                 C = group B all;
+                                 D = foreach C generate bb(B.name);
+                                 store D into ':TMP:/mybloom_3';
+                                 exec;
+                                 define bloom Bloom(':TMP:/mybloom_3');
+                                 E = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double);
+                                 F = filter E by bloom(name);
+                                 G = load ':INPATH:/singlefile/votertab10k'as (name:chararray, age:int, reg:chararray, contrib:float);
+                                 H = join G by name, F by name using 'repl';
+                                 store H into ':OUTPATH:';",
+                         'notmq' => 1,
+                         'verify_pig_script' => "
+                                 A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:int ,gpa:double);
+                                 B = filter A by name == 'alice allen';
+                                 C = load ':INPATH:/singlefile/votertab10k'as (name:chararray, age:int, reg:chararray, contrib:float);
+                                 D = join C by name, B by name;
+                                 store D into ':OUTPATH:';",
+                     }
+                 ],
+             }
         ],
     },
 ;

Added: pig/branches/branch-0.10/test/org/apache/pig/test/TestBloom.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/test/org/apache/pig/test/TestBloom.java?rev=1197318&view=auto
==============================================================================
--- pig/branches/branch-0.10/test/org/apache/pig/test/TestBloom.java (added)
+++ pig/branches/branch-0.10/test/org/apache/pig/test/TestBloom.java Thu Nov  3 21:44:25 2011
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import org.junit.Test;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.Bloom;
+import org.apache.pig.builtin.BuildBloom;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+/**
+ * This class unit tests the built in UDFs BuildBloom and Bloom.
+ */
+public class TestBloom extends junit.framework.TestCase {
+
+    static class TestBuildBloom extends BuildBloom {
+
+        TestBuildBloom(String numElements, String desiredFalsePositive) {
+            super("jenkins", numElements, desiredFalsePositive);
+        }
+
+        int getSize() { return vSize; }
+        int getNumHash() { return numHash; }
+
+    }
+
+    @Test
+    public void testSizeCalc() throws Exception {
+
+        TestBuildBloom tbb = new TestBuildBloom("1000", "0.01");
+        assertEquals(9585, tbb.getSize());
+        assertEquals(6, tbb.getNumHash());
+        tbb = new TestBuildBloom("1000000", "0.01");
+        assertEquals(9585058 , tbb.getSize());
+        assertEquals(6, tbb.getNumHash());
+        tbb = new TestBuildBloom("1000", "0.0001");
+        assertEquals(19170, tbb.getSize());
+        assertEquals(13, tbb.getNumHash());
+        tbb = new TestBuildBloom("1000000", "0.00001");
+        assertEquals(23962645, tbb.getSize());
+        assertEquals(16, tbb.getNumHash());
+    }
+
+    @Test
+    public void testBadHash() throws Exception {
+        String size = "100";
+        String numHash = "3";
+        String hashFunc = "nosuchhash";
+        boolean caughtException = false;
+        try {
+            BuildBloom bb = new BuildBloom(hashFunc, "fixed", size, numHash);
+        } catch (RuntimeException re) {
+            assertTrue(re.getMessage().contains("Unknown hash type"));
+            caughtException = true;
+        }
+        assertTrue(caughtException);
+    }
+
+    @Test
+    public void testFuncNames() throws Exception {
+        String size = "100";
+        String numHash = "3";
+        String hashFunc = "JENKINS_HASH";
+        BuildBloom bb = new BuildBloom(hashFunc, "fixed", size, numHash);
+        assertEquals("org.apache.pig.builtin.BuildBloom$Initial",
+            bb.getInitial());
+        assertEquals("org.apache.pig.builtin.BuildBloom$Intermediate",
+            bb.getIntermed());
+        assertEquals("org.apache.pig.builtin.BuildBloom$Final",
+            bb.getFinal());
+    }
+
+    @Test
+    public void testMap() throws Exception {
+        String size = "100";
+        String numHash = "3";
+        String hashFunc = "JENKINS";
+        TupleFactory tf = TupleFactory.getInstance();
+        BagFactory bf = BagFactory.getInstance();
+
+        Tuple t = tf.newTuple(1);
+        t.set(0, 1);
+        DataBag b = bf.newDefaultBag();
+        b.add(t);
+        Tuple input = tf.newTuple(b);
+        
+        BuildBloom.Initial map =
+            new BuildBloom.Initial(hashFunc, "fixed", size, numHash);
+        t = map.exec(input);
+
+        Bloom bloom = new Bloom("bla");
+        bloom.setFilter((DataByteArray)t.get(0));
+
+        // Test that everything we put in passes.
+        Tuple t1 = tf.newTuple(1);
+        t1.set(0, 1);
+        assertTrue(bloom.exec(t1));
+
+        // A few that don't pass
+        for (int i = 100; i < 10; i++) {
+            Tuple t2 = tf.newTuple(1);
+            t2.set(0, i);
+            assertFalse(bloom.exec(t2));
+        }
+    }
+
+    @Test
+    public void testCombiner() throws Exception {
+        String size = "100";
+        String numHash = "3";
+        String hashFunc = "jenkins";
+        TupleFactory tf = TupleFactory.getInstance();
+        BagFactory bf = BagFactory.getInstance();
+
+        DataBag combinerBag = bf.newDefaultBag();
+        for (int j = 0; j < 3; j++) { // map loop
+            Tuple t = tf.newTuple(1);
+            t.set(0, 10 + j);
+            DataBag mapBag = bf.newDefaultBag();
+            mapBag.add(t);
+            Tuple input = tf.newTuple(mapBag);
+            BuildBloom.Initial map =
+                new BuildBloom.Initial(hashFunc, "fixed", size, numHash);
+            combinerBag.add(map.exec(input));
+        }
+        Tuple t = tf.newTuple(1);
+        t.set(0, combinerBag);
+        BuildBloom.Intermediate combiner =
+            new BuildBloom.Intermediate(hashFunc, "fixed", size, numHash);
+        t = combiner.exec(t);
+
+        Bloom bloom = new Bloom("bla");
+        bloom.setFilter((DataByteArray)t.get(0));
+
+        // Test that everything we put in passes.
+        for (int j = 0; j < 3; j++) {
+            Tuple t1 = tf.newTuple(1);
+            t1.set(0, 10 + j);
+            assertTrue(bloom.exec(t1));
+        }
+
+        // A few that don't pass
+        for (int i = 100; i < 10; i++) {
+            Tuple t2 = tf.newTuple(1);
+            t2.set(0, i);
+            assertFalse(bloom.exec(t2));
+        }
+    }
+
+    @Test
+    public void testSingleKey() throws Exception {
+        String size = "100";
+        String numHash = "3";
+        String hashFunc = "MURMUR";
+        TupleFactory tf = TupleFactory.getInstance();
+        BagFactory bf = BagFactory.getInstance();
+
+        DataBag reducerBag = bf.newDefaultBag();
+        for (int i = 0; i < 3; i++) { // combiner loop
+            DataBag combinerBag = bf.newDefaultBag();
+            for (int j = 0; j < 3; j++) { // map loop
+                Tuple t = tf.newTuple(1);
+                t.set(0, i * 10 + j);
+                DataBag mapBag = bf.newDefaultBag();
+                mapBag.add(t);
+                Tuple input = tf.newTuple(mapBag);
+                BuildBloom.Initial map =
+                    new BuildBloom.Initial(hashFunc, "fixed", size, numHash);
+                combinerBag.add(map.exec(input));
+            }
+            Tuple t = tf.newTuple(1);
+            t.set(0, combinerBag);
+            BuildBloom.Intermediate combiner =
+                new BuildBloom.Intermediate(hashFunc, "fixed", size, numHash);
+            reducerBag.add(combiner.exec(t));
+        }
+
+        Tuple t = tf.newTuple(1);
+        t.set(0, reducerBag);
+        BuildBloom.Final reducer =
+            new BuildBloom.Final(hashFunc, "fixed", size, numHash);
+        DataByteArray dba = reducer.exec(t);
+
+        Bloom bloom = new Bloom("bla");
+        bloom.setFilter(dba);
+
+        // Test that everything we put in passes.
+        for (int i = 0; i < 3; i++) {
+            for (int j = 0; j < 3; j++) {
+                Tuple t1 = tf.newTuple(1);
+                t1.set(0, i * 10 + j);
+                assertTrue(bloom.exec(t1));
+            }
+        }
+
+        // A few that don't pass
+        for (int i = 100; i < 10; i++) {
+            Tuple t1 = tf.newTuple(1);
+            t1.set(0, i);
+            assertFalse(bloom.exec(t1));
+        }
+    }
+
+    @Test
+    public void testMultiKey() throws Exception {
+        String numElements = "10";
+        String falsePositive = "0.001";
+        String hashFunc = "murmur";
+        TupleFactory tf = TupleFactory.getInstance();
+        BagFactory bf = BagFactory.getInstance();
+
+        String[][] strs = {
+            { "fred", "joe", "bob" },
+            { "mary", "sally", "jane" },
+            { "fido", "spot", "fluffly" }};
+
+        DataBag reducerBag = bf.newDefaultBag();
+        for (int i = 0; i < 3; i++) { // combiner loop
+            DataBag combinerBag = bf.newDefaultBag();
+            for (int j = 0; j < 3; j++) { // map loop
+                Tuple t = tf.newTuple(2);
+                t.set(0, i * 10 + j);
+                t.set(1, strs[i][j]);
+                DataBag mapBag = bf.newDefaultBag();
+                mapBag.add(t);
+                Tuple input = tf.newTuple(mapBag);
+                BuildBloom.Initial map =
+                    new BuildBloom.Initial(hashFunc, numElements,
+                        falsePositive);
+                combinerBag.add(map.exec(input));
+            }
+            Tuple t = tf.newTuple(1);
+            t.set(0, combinerBag);
+            BuildBloom.Intermediate combiner =
+                new BuildBloom.Intermediate(hashFunc, numElements,
+                    falsePositive);
+            reducerBag.add(combiner.exec(t));
+        }
+
+        Tuple t = tf.newTuple(1);
+        t.set(0, reducerBag);
+        BuildBloom.Final reducer =
+            new BuildBloom.Final(hashFunc, numElements, falsePositive);
+        DataByteArray dba = reducer.exec(t);
+
+        Bloom bloom = new Bloom("bla");
+        bloom.setFilter(dba);
+
+        // Test that everything we put in passes.
+        for (int i = 0; i < 3; i++) {
+            for (int j = 0; j < 3; j++) {
+                Tuple t1 = tf.newTuple(2);
+                t1.set(0, i * 10 + j);
+                t1.set(1, strs[i][j]);
+                assertTrue(bloom.exec(t1));
+            }
+        }
+
+        // A few that don't pass
+        for (int i = 100; i < 10; i++) {
+            Tuple t1 = tf.newTuple(2);
+            t1.set(0, i);
+            t1.set(1, "ichabod");
+            assertFalse(bloom.exec(t1));
+        }
+    }}