You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by rs...@apache.org on 2012/10/10 18:32:08 UTC
[3/3] CRUNCH-75: Added BloomFilters in crunch-contrib
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ad90b151/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFactory.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFactory.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFactory.java
new file mode 100644
index 0000000..825b445
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFactory.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.bloomfilter;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.crunch.CombineFn.Aggregator;
+import org.apache.crunch.CombineFn.AggregatorCombineFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PObject;
+import org.apache.crunch.PTable;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.materialize.pobject.FirstElementPObject;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.bloom.BloomFilter;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Factory Class for creating BloomFilters. The APIs require a
+ * {@link BloomFilterFn} which is responsible for generating keys of the filter.
+ */
+public class BloomFilterFactory {
+ /**
+ * The method will take an input path and generates BloomFilters for all text
+ * files in that path. The method return back a {@link PObject} containing a
+ * {@link Map} having file names as keys and filters as values
+ */
+ public static PObject<Map<String, BloomFilter>> createFilter(Path inputPath, BloomFilterFn<String> filterFn)
+ throws IOException {
+ MRPipeline pipeline = new MRPipeline(BloomFilterFactory.class);
+ FileStatus[] listStatus = FileSystem.get(pipeline.getConfiguration()).listStatus(inputPath);
+ PTable<String, BloomFilter> filterTable = null;
+ for (FileStatus fileStatus : listStatus) {
+ Path path = fileStatus.getPath();
+ PCollection<String> readTextFile = pipeline.readTextFile(path.toString());
+ pipeline.getConfiguration().set(BloomFilterFn.CRUNCH_FILTER_NAME, path.getName());
+ PTable<String, BloomFilter> currentTable = createFilterTable(readTextFile, filterFn);
+ if (filterTable != null) {
+ filterTable = filterTable.union(currentTable);
+ } else {
+ filterTable = currentTable;
+ }
+ }
+ return filterTable.asMap();
+ }
+
+ public static <T> PObject<BloomFilter> createFilter(PCollection<T> collection, BloomFilterFn<T> filterFn) {
+ collection.getPipeline().getConfiguration().set(BloomFilterFn.CRUNCH_FILTER_NAME, collection.getName());
+ return new FirstElementPObject<BloomFilter>(createFilterTable(collection, filterFn).values());
+ }
+
+ private static <T> PTable<String, BloomFilter> createFilterTable(PCollection<T> collection, BloomFilterFn<T> filterFn) {
+ PTypeFamily tf = collection.getTypeFamily();
+ PTable<String, BloomFilter> table = collection.parallelDo(filterFn,
+ tf.tableOf(tf.strings(), Writables.writables(BloomFilter.class)));
+ return table.groupByKey(1).combineValues(new AggregatorCombineFn<String, BloomFilter>(new BloomFilterAggregator()));
+ }
+
+}
+
+@SuppressWarnings("serial")
+class BloomFilterAggregator implements Aggregator<BloomFilter> {
+ private transient BloomFilter bloomFilter = null;
+ private transient int filterSize;
+
+ @Override
+ public void update(BloomFilter value) {
+ bloomFilter.or(value);
+ }
+
+ @Override
+ public Iterable<BloomFilter> results() {
+ return ImmutableList.of(bloomFilter);
+ }
+
+ @Override
+ public void initialize(Configuration configuration) {
+ filterSize = BloomFilterFn.getBloomFilterSize(configuration);
+ }
+
+ @Override
+ public void reset() {
+ bloomFilter = BloomFilterFn.initializeFilter(filterSize);
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ad90b151/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFn.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFn.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFn.java
new file mode 100644
index 0000000..7d27b33
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFn.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.bloomfilter;
+
+import java.util.Collection;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.hadoop.util.hash.Hash;
+
+/**
+ * The class is responsible for generating keys that are used in a BloomFilter
+ */
+@SuppressWarnings("serial")
+public abstract class BloomFilterFn<S> extends DoFn<S, Pair<String, BloomFilter>> {
+ public static final String CRUNCH_FILTER_SIZE = "crunch.filter.size";
+ public static final String CRUNCH_FILTER_NAME = "crunch.filter.name";
+ private transient BloomFilter bloomFilter = null;
+
+ @Override
+ public void initialize() {
+ super.initialize();
+ bloomFilter = initializeFilter(getBloomFilterSize(getConfiguration()));
+ }
+
+ @Override
+ public void process(S input, Emitter<Pair<String, BloomFilter>> emitter) {
+ Collection<Key> keys = generateKeys(input);
+ if (CollectionUtils.isNotEmpty(keys))
+ bloomFilter.add(keys);
+ }
+
+ public abstract Collection<Key> generateKeys(S input);
+
+ @Override
+ public void cleanup(Emitter<Pair<String, BloomFilter>> emitter) {
+ String filterName = getConfiguration().get(CRUNCH_FILTER_NAME);
+ emitter.emit(Pair.of(filterName, bloomFilter));
+ }
+
+ static BloomFilter initializeFilter(int size) {
+ return new BloomFilter(size, 5, Hash.MURMUR_HASH);
+ }
+
+ static int getBloomFilterSize(Configuration configuration) {
+ return configuration.getInt(CRUNCH_FILTER_SIZE, 1024);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ad90b151/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/package-info.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/package-info.java
new file mode 100644
index 0000000..8ce703e
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/package-info.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * BloomFilters are space and time efficient and Hadoop has support for creating
+ * them.This package provides support for creating BloomFilters in crunch.
+ */
+package org.apache.crunch.contrib.bloomfilter;
+
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ad90b151/crunch-contrib/src/main/java/org/apache/crunch/contrib/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/package-info.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/package-info.java
new file mode 100644
index 0000000..7f5eee7
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/package-info.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * The package contains some interesting contributions from the users of
+ * crunch.These interesting things did not have a place in the core library of crunch
+ * but they are quite useful for the users of crunch.
+ */
+package org.apache.crunch.contrib;
+
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ad90b151/crunch-dist/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-dist/pom.xml b/crunch-dist/pom.xml
index d7ff63e..b770685 100644
--- a/crunch-dist/pom.xml
+++ b/crunch-dist/pom.xml
@@ -57,6 +57,10 @@ under the License.
<groupId>org.apache.crunch</groupId>
<artifactId>crunch-scrunch</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.crunch</groupId>
+ <artifactId>crunch-contrib</artifactId>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ad90b151/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b2a7ba2..3ec2af0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,6 +47,7 @@ under the License.
<module>crunch</module>
<module>crunch-hbase</module>
<module>crunch-test</module>
+ <module>crunch-contrib</module>
<module>crunch-examples</module>
<module>crunch-archetype</module>
<module>crunch-scrunch</module>
@@ -156,6 +157,12 @@ under the License.
<artifactId>crunch-scrunch</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.crunch</groupId>
+ <artifactId>crunch-contrib</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>com.google.guava</groupId>