You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by rs...@apache.org on 2012/10/10 18:32:08 UTC

[3/3] CRUNCH-75: Added BloomFilters in crunch-contrib

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ad90b151/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFactory.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFactory.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFactory.java
new file mode 100644
index 0000000..825b445
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFactory.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.bloomfilter;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.crunch.CombineFn.Aggregator;
+import org.apache.crunch.CombineFn.AggregatorCombineFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PObject;
+import org.apache.crunch.PTable;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.materialize.pobject.FirstElementPObject;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.bloom.BloomFilter;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Factory Class for creating BloomFilters. The APIs require a
+ * {@link BloomFilterFn} which is responsible for generating keys of the filter.
+ */
+public class BloomFilterFactory {
+  /**
+   * The method will take an input path and generates BloomFilters for all text
+   * files in that path. The method return back a {@link PObject} containing a
+   * {@link Map} having file names as keys and filters as values
+   */
+  public static PObject<Map<String, BloomFilter>> createFilter(Path inputPath, BloomFilterFn<String> filterFn)
+      throws IOException {
+    MRPipeline pipeline = new MRPipeline(BloomFilterFactory.class);
+    FileStatus[] listStatus = FileSystem.get(pipeline.getConfiguration()).listStatus(inputPath);
+    PTable<String, BloomFilter> filterTable = null;
+    for (FileStatus fileStatus : listStatus) {
+      Path path = fileStatus.getPath();
+      PCollection<String> readTextFile = pipeline.readTextFile(path.toString());
+      pipeline.getConfiguration().set(BloomFilterFn.CRUNCH_FILTER_NAME, path.getName());
+      PTable<String, BloomFilter> currentTable = createFilterTable(readTextFile, filterFn);
+      if (filterTable != null) {
+        filterTable = filterTable.union(currentTable);
+      } else {
+        filterTable = currentTable;
+      }
+    }
+    return filterTable.asMap();
+  }
+
+  public static <T> PObject<BloomFilter> createFilter(PCollection<T> collection, BloomFilterFn<T> filterFn) {
+    collection.getPipeline().getConfiguration().set(BloomFilterFn.CRUNCH_FILTER_NAME, collection.getName());
+    return new FirstElementPObject<BloomFilter>(createFilterTable(collection, filterFn).values());
+  }
+
+  private static <T> PTable<String, BloomFilter> createFilterTable(PCollection<T> collection, BloomFilterFn<T> filterFn) {
+    PTypeFamily tf = collection.getTypeFamily();
+    PTable<String, BloomFilter> table = collection.parallelDo(filterFn,
+        tf.tableOf(tf.strings(), Writables.writables(BloomFilter.class)));
+    return table.groupByKey(1).combineValues(new AggregatorCombineFn<String, BloomFilter>(new BloomFilterAggregator()));
+  }
+
+}
+
+@SuppressWarnings("serial")
+class BloomFilterAggregator implements Aggregator<BloomFilter> {
+  private transient BloomFilter bloomFilter = null;
+  private transient int filterSize;
+
+  @Override
+  public void update(BloomFilter value) {
+    bloomFilter.or(value);
+  }
+
+  @Override
+  public Iterable<BloomFilter> results() {
+    return ImmutableList.of(bloomFilter);
+  }
+
+  @Override
+  public void initialize(Configuration configuration) {
+    filterSize = BloomFilterFn.getBloomFilterSize(configuration);
+  }
+
+  @Override
+  public void reset() {
+    bloomFilter = BloomFilterFn.initializeFilter(filterSize);
+
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ad90b151/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFn.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFn.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFn.java
new file mode 100644
index 0000000..7d27b33
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFn.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.bloomfilter;
+
+import java.util.Collection;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.hadoop.util.hash.Hash;
+
+/**
+ * The class is responsible for generating keys that are used in a BloomFilter
+ */
+@SuppressWarnings("serial")
+public abstract class BloomFilterFn<S> extends DoFn<S, Pair<String, BloomFilter>> {
+  public static final String CRUNCH_FILTER_SIZE = "crunch.filter.size";
+  public static final String CRUNCH_FILTER_NAME = "crunch.filter.name";
+  private transient BloomFilter bloomFilter = null;
+
+  @Override
+  public void initialize() {
+    super.initialize();
+    bloomFilter = initializeFilter(getBloomFilterSize(getConfiguration()));
+  }
+
+  @Override
+  public void process(S input, Emitter<Pair<String, BloomFilter>> emitter) {
+    Collection<Key> keys = generateKeys(input);
+    if (CollectionUtils.isNotEmpty(keys))
+      bloomFilter.add(keys);
+  }
+
+  public abstract Collection<Key> generateKeys(S input);
+
+  @Override
+  public void cleanup(Emitter<Pair<String, BloomFilter>> emitter) {
+    String filterName = getConfiguration().get(CRUNCH_FILTER_NAME);
+    emitter.emit(Pair.of(filterName, bloomFilter));
+  }
+
+  static BloomFilter initializeFilter(int size) {
+    return new BloomFilter(size, 5, Hash.MURMUR_HASH);
+  }
+
+  static int getBloomFilterSize(Configuration configuration) {
+    return configuration.getInt(CRUNCH_FILTER_SIZE, 1024);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ad90b151/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/package-info.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/package-info.java
new file mode 100644
index 0000000..8ce703e
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/package-info.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** 
+ * BloomFilters are space and time efficient and Hadoop has support for creating
+ * them.This package provides support for creating BloomFilters in crunch.
+ */
+package org.apache.crunch.contrib.bloomfilter;
+

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ad90b151/crunch-contrib/src/main/java/org/apache/crunch/contrib/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/package-info.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/package-info.java
new file mode 100644
index 0000000..7f5eee7
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/package-info.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *  The package contains some interesting contributions from the users of
+ *  crunch.These interesting things did not have a place in the core library of crunch 
+ *  but they are quite useful for the users of crunch. 
+ */
+package org.apache.crunch.contrib;
+

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ad90b151/crunch-dist/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-dist/pom.xml b/crunch-dist/pom.xml
index d7ff63e..b770685 100644
--- a/crunch-dist/pom.xml
+++ b/crunch-dist/pom.xml
@@ -57,6 +57,10 @@ under the License.
       <groupId>org.apache.crunch</groupId>
       <artifactId>crunch-scrunch</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.crunch</groupId>
+      <artifactId>crunch-contrib</artifactId>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ad90b151/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b2a7ba2..3ec2af0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,6 +47,7 @@ under the License.
     <module>crunch</module>
     <module>crunch-hbase</module>
     <module>crunch-test</module>
+    <module>crunch-contrib</module>
     <module>crunch-examples</module>
     <module>crunch-archetype</module>
     <module>crunch-scrunch</module>
@@ -156,6 +157,12 @@ under the License.
         <artifactId>crunch-scrunch</artifactId>
         <version>${project.version}</version>
       </dependency>
+      
+      <dependency>
+        <groupId>org.apache.crunch</groupId>
+        <artifactId>crunch-contrib</artifactId>
+        <version>${project.version}</version>
+      </dependency>
 
       <dependency>
         <groupId>com.google.guava</groupId>