You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2011/11/03 02:07:53 UTC
svn commit: r1196908 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/builtin/Bloom.java
src/org/apache/pig/builtin/BuildBloom.java
src/org/apache/pig/builtin/BuildBloomBase.java
test/e2e/pig/tests/nightly.conf test/org/apache/pig/test/TestBloom.java
Author: gates
Date: Thu Nov 3 01:07:52 2011
New Revision: 1196908
URL: http://svn.apache.org/viewvc?rev=1196908&view=rev
Log:
PIG-2328: Add builtin UDFs for building and using bloom filters
Added:
pig/trunk/src/org/apache/pig/builtin/Bloom.java
pig/trunk/src/org/apache/pig/builtin/BuildBloom.java
pig/trunk/src/org/apache/pig/builtin/BuildBloomBase.java
pig/trunk/test/org/apache/pig/test/TestBloom.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/test/e2e/pig/tests/nightly.conf
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1196908&r1=1196907&r2=1196908&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Nov 3 01:07:52 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-2328: Add builtin UDFs for building and using bloom filters (gates)
+
PIG-2338: Need signature for EvalFunc (daijy)
OPTIMIZATIONS
Added: pig/trunk/src/org/apache/pig/builtin/Bloom.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/Bloom.java?rev=1196908&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/Bloom.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/Bloom.java Thu Nov 3 01:07:52 2011
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.builtin;
+
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+
+import org.apache.pig.FilterFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Use a Bloom filter build previously by BuildBloom. You would first
+ * build a bloom filter in a group all job. For example:
+ * in a group all job. For example:
+ * define bb BuildBloom('jenkins', '100', '0.1');
+ * A = load 'foo' as (x, y);
+ * B = group A all;
+ * C = foreach B generate bb(A.x);
+ * store C into 'mybloom';
+ * The bloom filter can be on multiple keys by passing more than one field
+ * (or the entire bag) to BuildBloom.
+ * The resulting file can then be used in a Bloom filter as:
+ * define bloom Bloom(mybloom);
+ * A = load 'foo' as (x, y);
+ * B = load 'bar' as (z);
+ * C = filter B by bloom(z);
+ * D = join C by z, A by x;
+ * It uses {@link org.apache.hadoop.util.bloom.BloomFilter}.
+ */
+public class Bloom extends FilterFunc {
+
+ private String bloomFile;
+ public BloomFilter filter = null;
+
+ /**
+ * @param filename file containing the serialized Bloom filter
+ */
+ public Bloom(String filename) {
+ bloomFile = filename;
+ }
+
+ @Override
+ public Boolean exec(Tuple input) throws IOException {
+ if (filter == null) {
+ init();
+ }
+ byte[] b;
+ if (input.size() == 1) b = DataType.toBytes(input.get(0));
+ else b = DataType.toBytes(input, DataType.TUPLE);
+
+ Key k = new Key(b);
+ return filter.membershipTest(k);
+ }
+
+ @Override
+ public List<String> getCacheFiles() {
+ List<String> list = new ArrayList<String>(1);
+ // We were passed the name of the file on HDFS. Append a
+ // name for the file on the task node.
+ try {
+ list.add(bloomFile + "#" + getFilenameFromPath(bloomFile));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return list;
+ }
+
+ private void init() throws IOException {
+ filter = new BloomFilter();
+ String dcFile = "./" + getFilenameFromPath(bloomFile) +
+ "/part-r-00000";
+ filter.readFields(new DataInputStream(new FileInputStream(dcFile)));
+ }
+
+ /**
+ * For testing only, do not use directly.
+ */
+ public void setFilter(DataByteArray dba) throws IOException {
+ DataInputStream dis = new DataInputStream(new
+ ByteArrayInputStream(dba.get()));
+ filter = new BloomFilter();
+ filter.readFields(dis);
+ }
+
+ private String getFilenameFromPath(String p) throws IOException {
+ return p.replace("/", "_");
+ }
+
+}
Added: pig/trunk/src/org/apache/pig/builtin/BuildBloom.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/BuildBloom.java?rev=1196908&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/BuildBloom.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/BuildBloom.java Thu Nov 3 01:07:52 2011
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Build a bloom filter for use later in Bloom. This UDF is intended to run
+ * in a group all job. For example:
+ * define bb BuildBloom('jenkins', '100', '0.1');
+ * A = load 'foo' as (x, y);
+ * B = group A all;
+ * C = foreach B generate BuildBloom(A.x);
+ * store C into 'mybloom';
+ * The bloom filter can be on multiple keys by passing more than one field
+ * (or the entire bag) to BuildBloom.
+ * The resulting file can then be used in a Bloom filter as:
+ * define bloom Bloom(mybloom);
+ * A = load 'foo' as (x, y);
+ * B = load 'bar' as (z);
+ * C = filter B by Bloom(z);
+ * D = join C by z, A by x;
+ * It uses {@link org.apache.hadoop.util.bloom.BloomFilter}.
+ */
+public class BuildBloom extends BuildBloomBase<DataByteArray> implements Algebraic {
+
+ /**
+ * Build a bloom filter of fixed size and number of hash functions.
+ * @param hashType type of the hashing function (see
+ * {@link org.apache.hadoop.util.bloom.Hash}).
+ * @param mode Will be ignored, though by convention it should be
+ * "fixed" or "fixedsize"
+ * @param vectorSize The vector size of this filter.
+ * @param nbHash The number of hash functions to consider.
+ */
+ public BuildBloom(String hashType,
+ String mode,
+ String vectorSize,
+ String nbHash) {
+ super(hashType, mode, vectorSize, nbHash);
+ }
+
+ /**
+ * Construct a Bloom filter based on expected number of elements and
+ * desired accuracy.
+ * @param hashType type of the hashing function (see
+ * {@link org.apache.hadoop.util.bloom.Hash}).
+ * @param numElements The number of distinct elements expected to be
+ * placed in this filter.
+ * @param desiredFalsePositive the acceptable rate of false positives.
+ * This should be a floating point value between 0 and 1.0, where 1.0
+ * would be 100% (ie, a totally useless filter).
+ */
+ public BuildBloom(String hashType,
+ String numElements,
+ String desiredFalsePositive) {
+ super(hashType, numElements, desiredFalsePositive);
+ }
+
+ @Override
+ public DataByteArray exec(Tuple input) throws IOException {
+ throw new IOException("This must be used with algebraic!");
+ }
+
+ public String getInitial() {
+ return Initial.class.getName();
+ }
+
+ public String getIntermed() {
+ return Intermediate.class.getName();
+ }
+
+ public String getFinal() {
+ return Final.class.getName();
+ }
+
+ static public class Initial extends BuildBloomBase<Tuple> {
+
+ public Initial() {
+ }
+
+ public Initial(String hashType,
+ String mode,
+ String vectorSize,
+ String nbHash ) {
+ super(hashType, mode, vectorSize, nbHash);
+ }
+
+ public Initial(String hashType,
+ String numElements,
+ String desiredFalsePositive) {
+ super(hashType, numElements, desiredFalsePositive);
+ }
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ if (input == null || input.size() == 0) return null;
+
+ // Strip off the initial level of bag
+ DataBag values = (DataBag)input.get(0);
+ Iterator<Tuple> it = values.iterator();
+ Tuple t = it.next();
+
+ // If the input tuple has only one field, then we'll extract
+ // that field and serialize it into a key. If it has multiple
+ // fields, we'll serialize the whole tuple.
+ byte[] b;
+ if (t.size() == 1) b = DataType.toBytes(t.get(0));
+ else b = DataType.toBytes(t, DataType.TUPLE);
+
+ Key k = new Key(b);
+ filter = new BloomFilter(vSize, numHash, hType);
+ filter.add(k);
+
+ return TupleFactory.getInstance().newTuple(bloomOut());
+ }
+ }
+
+ static public class Intermediate extends BuildBloomBase<Tuple> {
+
+ public Intermediate() {
+ }
+
+ public Intermediate(String hashType,
+ String mode,
+ String vectorSize,
+ String nbHash ) {
+ super(hashType, mode, vectorSize, nbHash);
+ }
+
+ public Intermediate(String hashType,
+ String numElements,
+ String desiredFalsePositive) {
+ super(hashType, numElements, desiredFalsePositive);
+ }
+
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ return TupleFactory.getInstance().newTuple(bloomOr(input));
+ }
+ }
+
+ static public class Final extends BuildBloomBase<DataByteArray> {
+
+ public Final() {
+ }
+
+ public Final(String hashType,
+ String mode,
+ String vectorSize,
+ String nbHash ) {
+ super(hashType, mode, vectorSize, nbHash);
+ }
+
+ public Final(String hashType,
+ String numElements,
+ String desiredFalsePositive) {
+ super(hashType, numElements, desiredFalsePositive);
+ }
+
+ @Override
+ public DataByteArray exec(Tuple input) throws IOException {
+ return bloomOr(input);
+ }
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ return new Schema(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+ }
+
+}
Added: pig/trunk/src/org/apache/pig/builtin/BuildBloomBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/BuildBloomBase.java?rev=1196908&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/BuildBloomBase.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/BuildBloomBase.java Thu Nov 3 01:07:52 2011
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.Iterator;
+
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.hash.Hash;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+/**
+ * A Base class for BuildBloom and its Algebraic implementations.
+ */
+public abstract class BuildBloomBase<T> extends EvalFunc<T> {
+
+ protected int vSize;
+ protected int numHash;
+ protected int hType;
+ protected BloomFilter filter;
+
+ protected BuildBloomBase() {
+ }
+
+ /**
+ * @param hashType type of the hashing function (see
+ * {@link org.apache.hadoop.util.bloom.Hash}).
+ * @param mode Will be ignored, though by convention it should be
+ * "fixed" or "fixedsize"
+ * @param vectorSize The vector size of <i>this</i> filter.
+ * @param nbHash The number of hash functions to consider.
+ */
+ public BuildBloomBase(String hashType,
+ String mode,
+ String vectorSize,
+ String nbHash) {
+ vSize = Integer.valueOf(vectorSize);
+ numHash = Integer.valueOf(nbHash);
+ hType = convertHashType(hashType);
+ }
+
+ /**
+ * @param hashType type of the hashing function (see
+ * {@link org.apache.hadoop.util.bloom.Hash}).
+ * @param numElements The number of distinct elements expected to be
+ * placed in this filter.
+ * @param desiredFalsePositive the acceptable rate of false positives.
+ * This should be a floating point value between 0 and 1.0, where 1.0
+ * would be 100% (ie, a totally useless filter).
+ */
+ public BuildBloomBase(String hashType,
+ String numElements,
+ String desiredFalsePositive) {
+ setSize(numElements, desiredFalsePositive);
+ hType = convertHashType(hashType);
+ }
+
+
+ protected DataByteArray bloomOr(Tuple input) throws IOException {
+ filter = new BloomFilter(vSize, numHash, hType);
+
+ try {
+ DataBag values = (DataBag)input.get(0);
+ for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+ Tuple t = it.next();
+ filter.or(bloomIn((DataByteArray)t.get(0)));
+ }
+ } catch (ExecException ee) {
+ throw new IOException(ee);
+ }
+
+ return bloomOut();
+ }
+
+ protected DataByteArray bloomOut() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(vSize / 8);
+ DataOutputStream dos = new DataOutputStream(baos);
+ filter.write(dos);
+ return new DataByteArray(baos.toByteArray());
+ }
+
+ protected BloomFilter bloomIn(DataByteArray b) throws IOException {
+ DataInputStream dis = new DataInputStream(new
+ ByteArrayInputStream(b.get()));
+ BloomFilter f = new BloomFilter();
+ f.readFields(dis);
+ return f;
+ }
+
+ private int convertHashType(String hashType) {
+ if (hashType.toLowerCase().contains("jenkins")) {
+ return Hash.JENKINS_HASH;
+ } else if (hashType.toLowerCase().contains("murmur")) {
+ return Hash.MURMUR_HASH;
+ } else {
+ throw new RuntimeException("Unknown hash type " + hashType +
+ ". Valid values are jenkins and murmur.");
+ }
+ }
+
+ private void setSize(String numElements, String desiredFalsePositive) {
+ int num = Integer.valueOf(numElements);
+ float fp = Float.valueOf(desiredFalsePositive);
+ if (num < 1 || fp < 0.0 || fp >= 1.0) {
+ throw new RuntimeException("Number of elements must be greater "
+ + "than zero and desiredFalsePositive must be between 0 "
+ + " and 1.");
+ }
+ vSize = (int)(-1 * (num * Math.log(fp)) / Math.pow(Math.log(2), 2));
+ log.info("BuildBloom setting vector size to " + vSize);
+
+ numHash = (int)(0.7 * vSize / num);
+ log.info("BuildBloom setting number of hashes to " + numHash);
+ }
+
+
+}
Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1196908&r1=1196907&r2=1196908&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Thu Nov 3 01:07:52 2011
@@ -3883,6 +3883,77 @@ store E into ':OUTPATH:';\,
}
],
},{
+ 'name' => 'Bloom',
+ 'execonly' => 'mapred', # distributed cache does not work in local mode
+ 'tests' => [
+ {
+ 'num' => 1,
+ 'pig' => "define bb BuildBloom('Hash.JENKINS_HASH', 'fixed', '128', '3');
+ fs -rmr :TMP:/mybloom_1;
+ A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double);
+ B = filter A by name == 'alice allen';
+ C = group B all;
+ D = foreach C generate bb(B.name);
+ store D into ':TMP:/mybloom_1';
+ exec;
+ define bloom Bloom(':TMP:/mybloom_1');
+ E = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double);
+ F = filter E by bloom(name);
+ store F into ':OUTPATH:';",
+ 'notmq' => 1,
+ 'verify_pig_script' => "
+ A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:int ,gpa:double);
+ B = filter A by name == 'alice allen';
+ store B into ':OUTPATH:';",
+ }, {
+ 'num' => 2,
+ 'pig' => "define bb BuildBloom('Hash.MURMUR_HASH', 'fixed', '128', '3');
+ fs -rmr :TMP:/mybloom_2;
+ A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double);
+ B = filter A by name == 'alice allen';
+ C = group B all;
+ D = foreach C generate bb(B.name);
+ store D into ':TMP:/mybloom_2';
+ exec;
+ define bloom Bloom(':TMP:/mybloom_2');
+ E = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double);
+ F = filter E by bloom(name);
+ G = load ':INPATH:/singlefile/votertab10k'as (name:chararray, age:int, reg:chararray, contrib:float);
+ H = join F by name, G by name;
+ store H into ':OUTPATH:';",
+ 'notmq' => 1,
+ 'verify_pig_script' => "
+ A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:int ,gpa:double);
+ B = filter A by name == 'alice allen';
+ C = load ':INPATH:/singlefile/votertab10k'as (name:chararray, age:int, reg:chararray, contrib:float);
+ D = join B by name, C by name;
+ store D into ':OUTPATH:';",
+ },{
+ 'num' => 3,
+ 'pig' => "define bb BuildBloom('Hash.JENKINS_HASH', '1', '0.0001');
+ fs -rmr :TMP:/mybloom_3;
+ A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double);
+ B = filter A by name == 'alice allen';
+ C = group B all;
+ D = foreach C generate bb(B.name);
+ store D into ':TMP:/mybloom_3';
+ exec;
+ define bloom Bloom(':TMP:/mybloom_3');
+ E = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double);
+ F = filter E by bloom(name);
+ G = load ':INPATH:/singlefile/votertab10k'as (name:chararray, age:int, reg:chararray, contrib:float);
+ H = join G by name, F by name using 'repl';
+ store H into ':OUTPATH:';",
+ 'notmq' => 1,
+ 'verify_pig_script' => "
+ A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:int ,gpa:double);
+ B = filter A by name == 'alice allen';
+ C = load ':INPATH:/singlefile/votertab10k'as (name:chararray, age:int, reg:chararray, contrib:float);
+ D = join C by name, B by name;
+ store D into ':OUTPATH:';",
+ }
+ ],
+ },{
'name' => 'UDFContext',
'tests' => [
{
Added: pig/trunk/test/org/apache/pig/test/TestBloom.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBloom.java?rev=1196908&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBloom.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestBloom.java Thu Nov 3 01:07:52 2011
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import org.junit.Test;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.Bloom;
+import org.apache.pig.builtin.BuildBloom;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+/**
+ * This class unit tests the built in UDFs BuildBloom and Bloom.
+ */
+public class TestBloom extends junit.framework.TestCase {
+
+ static class TestBuildBloom extends BuildBloom {
+
+ TestBuildBloom(String numElements, String desiredFalsePositive) {
+ super("jenkins", numElements, desiredFalsePositive);
+ }
+
+ int getSize() { return vSize; }
+ int getNumHash() { return numHash; }
+
+ }
+
+ @Test
+ public void testSizeCalc() throws Exception {
+
+ TestBuildBloom tbb = new TestBuildBloom("1000", "0.01");
+ assertEquals(9585, tbb.getSize());
+ assertEquals(6, tbb.getNumHash());
+ tbb = new TestBuildBloom("1000000", "0.01");
+ assertEquals(9585058 , tbb.getSize());
+ assertEquals(6, tbb.getNumHash());
+ tbb = new TestBuildBloom("1000", "0.0001");
+ assertEquals(19170, tbb.getSize());
+ assertEquals(13, tbb.getNumHash());
+ tbb = new TestBuildBloom("1000000", "0.00001");
+ assertEquals(23962645, tbb.getSize());
+ assertEquals(16, tbb.getNumHash());
+ }
+
+ @Test
+ public void testBadHash() throws Exception {
+ String size = "100";
+ String numHash = "3";
+ String hashFunc = "nosuchhash";
+ boolean caughtException = false;
+ try {
+ BuildBloom bb = new BuildBloom(hashFunc, "fixed", size, numHash);
+ } catch (RuntimeException re) {
+ assertTrue(re.getMessage().contains("Unknown hash type"));
+ caughtException = true;
+ }
+ assertTrue(caughtException);
+ }
+
+ @Test
+ public void testFuncNames() throws Exception {
+ String size = "100";
+ String numHash = "3";
+ String hashFunc = "JENKINS_HASH";
+ BuildBloom bb = new BuildBloom(hashFunc, "fixed", size, numHash);
+ assertEquals("org.apache.pig.builtin.BuildBloom$Initial",
+ bb.getInitial());
+ assertEquals("org.apache.pig.builtin.BuildBloom$Intermediate",
+ bb.getIntermed());
+ assertEquals("org.apache.pig.builtin.BuildBloom$Final",
+ bb.getFinal());
+ }
+
+ @Test
+ public void testMap() throws Exception {
+ String size = "100";
+ String numHash = "3";
+ String hashFunc = "JENKINS";
+ TupleFactory tf = TupleFactory.getInstance();
+ BagFactory bf = BagFactory.getInstance();
+
+ Tuple t = tf.newTuple(1);
+ t.set(0, 1);
+ DataBag b = bf.newDefaultBag();
+ b.add(t);
+ Tuple input = tf.newTuple(b);
+
+ BuildBloom.Initial map =
+ new BuildBloom.Initial(hashFunc, "fixed", size, numHash);
+ t = map.exec(input);
+
+ Bloom bloom = new Bloom("bla");
+ bloom.setFilter((DataByteArray)t.get(0));
+
+ // Test that everything we put in passes.
+ Tuple t1 = tf.newTuple(1);
+ t1.set(0, 1);
+ assertTrue(bloom.exec(t1));
+
+ // A few that don't pass
+ for (int i = 100; i < 10; i++) {
+ Tuple t2 = tf.newTuple(1);
+ t2.set(0, i);
+ assertFalse(bloom.exec(t2));
+ }
+ }
+
+ @Test
+ public void testCombiner() throws Exception {
+ String size = "100";
+ String numHash = "3";
+ String hashFunc = "jenkins";
+ TupleFactory tf = TupleFactory.getInstance();
+ BagFactory bf = BagFactory.getInstance();
+
+ DataBag combinerBag = bf.newDefaultBag();
+ for (int j = 0; j < 3; j++) { // map loop
+ Tuple t = tf.newTuple(1);
+ t.set(0, 10 + j);
+ DataBag mapBag = bf.newDefaultBag();
+ mapBag.add(t);
+ Tuple input = tf.newTuple(mapBag);
+ BuildBloom.Initial map =
+ new BuildBloom.Initial(hashFunc, "fixed", size, numHash);
+ combinerBag.add(map.exec(input));
+ }
+ Tuple t = tf.newTuple(1);
+ t.set(0, combinerBag);
+ BuildBloom.Intermediate combiner =
+ new BuildBloom.Intermediate(hashFunc, "fixed", size, numHash);
+ t = combiner.exec(t);
+
+ Bloom bloom = new Bloom("bla");
+ bloom.setFilter((DataByteArray)t.get(0));
+
+ // Test that everything we put in passes.
+ for (int j = 0; j < 3; j++) {
+ Tuple t1 = tf.newTuple(1);
+ t1.set(0, 10 + j);
+ assertTrue(bloom.exec(t1));
+ }
+
+ // A few that don't pass
+ for (int i = 100; i < 10; i++) {
+ Tuple t2 = tf.newTuple(1);
+ t2.set(0, i);
+ assertFalse(bloom.exec(t2));
+ }
+ }
+
+ @Test
+ public void testSingleKey() throws Exception {
+ String size = "100";
+ String numHash = "3";
+ String hashFunc = "MURMUR";
+ TupleFactory tf = TupleFactory.getInstance();
+ BagFactory bf = BagFactory.getInstance();
+
+ DataBag reducerBag = bf.newDefaultBag();
+ for (int i = 0; i < 3; i++) { // combiner loop
+ DataBag combinerBag = bf.newDefaultBag();
+ for (int j = 0; j < 3; j++) { // map loop
+ Tuple t = tf.newTuple(1);
+ t.set(0, i * 10 + j);
+ DataBag mapBag = bf.newDefaultBag();
+ mapBag.add(t);
+ Tuple input = tf.newTuple(mapBag);
+ BuildBloom.Initial map =
+ new BuildBloom.Initial(hashFunc, "fixed", size, numHash);
+ combinerBag.add(map.exec(input));
+ }
+ Tuple t = tf.newTuple(1);
+ t.set(0, combinerBag);
+ BuildBloom.Intermediate combiner =
+ new BuildBloom.Intermediate(hashFunc, "fixed", size, numHash);
+ reducerBag.add(combiner.exec(t));
+ }
+
+ Tuple t = tf.newTuple(1);
+ t.set(0, reducerBag);
+ BuildBloom.Final reducer =
+ new BuildBloom.Final(hashFunc, "fixed", size, numHash);
+ DataByteArray dba = reducer.exec(t);
+
+ Bloom bloom = new Bloom("bla");
+ bloom.setFilter(dba);
+
+ // Test that everything we put in passes.
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 3; j++) {
+ Tuple t1 = tf.newTuple(1);
+ t1.set(0, i * 10 + j);
+ assertTrue(bloom.exec(t1));
+ }
+ }
+
+ // A few that don't pass
+ for (int i = 100; i < 10; i++) {
+ Tuple t1 = tf.newTuple(1);
+ t1.set(0, i);
+ assertFalse(bloom.exec(t1));
+ }
+ }
+
+ @Test
+ public void testMultiKey() throws Exception {
+ String numElements = "10";
+ String falsePositive = "0.001";
+ String hashFunc = "murmur";
+ TupleFactory tf = TupleFactory.getInstance();
+ BagFactory bf = BagFactory.getInstance();
+
+ String[][] strs = {
+ { "fred", "joe", "bob" },
+ { "mary", "sally", "jane" },
+ { "fido", "spot", "fluffly" }};
+
+ DataBag reducerBag = bf.newDefaultBag();
+ for (int i = 0; i < 3; i++) { // combiner loop
+ DataBag combinerBag = bf.newDefaultBag();
+ for (int j = 0; j < 3; j++) { // map loop
+ Tuple t = tf.newTuple(2);
+ t.set(0, i * 10 + j);
+ t.set(1, strs[i][j]);
+ DataBag mapBag = bf.newDefaultBag();
+ mapBag.add(t);
+ Tuple input = tf.newTuple(mapBag);
+ BuildBloom.Initial map =
+ new BuildBloom.Initial(hashFunc, numElements,
+ falsePositive);
+ combinerBag.add(map.exec(input));
+ }
+ Tuple t = tf.newTuple(1);
+ t.set(0, combinerBag);
+ BuildBloom.Intermediate combiner =
+ new BuildBloom.Intermediate(hashFunc, numElements,
+ falsePositive);
+ reducerBag.add(combiner.exec(t));
+ }
+
+ Tuple t = tf.newTuple(1);
+ t.set(0, reducerBag);
+ BuildBloom.Final reducer =
+ new BuildBloom.Final(hashFunc, numElements, falsePositive);
+ DataByteArray dba = reducer.exec(t);
+
+ Bloom bloom = new Bloom("bla");
+ bloom.setFilter(dba);
+
+ // Test that everything we put in passes.
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 3; j++) {
+ Tuple t1 = tf.newTuple(2);
+ t1.set(0, i * 10 + j);
+ t1.set(1, strs[i][j]);
+ assertTrue(bloom.exec(t1));
+ }
+ }
+
+ // A few that don't pass
+ for (int i = 100; i < 10; i++) {
+ Tuple t1 = tf.newTuple(2);
+ t1.set(0, i);
+ t1.set(1, "ichabod");
+ assertFalse(bloom.exec(t1));
+ }
+ }}