You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by da...@apache.org on 2016/01/13 00:05:20 UTC

[1/2] crunch git commit: Java 8 lambda support for Apache Crunch.

Repository: crunch
Updated Branches:
  refs/heads/master f8920d355 -> 7d7af4ef4


http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java
new file mode 100644
index 0000000..0b4e4fa
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Target;
+import org.apache.crunch.lambda.fn.SFunction;
+import org.apache.crunch.lambda.fn.SPredicate;
+import org.apache.crunch.lib.join.DefaultJoinStrategy;
+import org.apache.crunch.lib.join.JoinStrategy;
+import org.apache.crunch.lib.join.JoinType;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+import java.util.Collection;
+
+/**
+ * Java 8 friendly version of the {@link PTable} interface, allowing distributed operations to be expressed in
+ * terms of lambda expressions and method references, instead of creating a new class implementation for each operation.
+ * @param <K> key type for this table
+ * @param <V> value type for this table
+ */
+public interface LTable<K, V> extends LCollection<Pair<K, V>> {
+    /**
+     * Get the underlying {@link PTable} for this LCollection
+     */
+    PTable<K, V> underlying();
+
+    /**
+     * Group this table by key to yield a {@link LGroupedTable}
+     */
+    default LGroupedTable<K, V> groupByKey() {
+        return factory().wrap(underlying().groupByKey());
+    }
+
+    /**
+     * Group this table by key to yield a {@link LGroupedTable}
+     */
+    default LGroupedTable<K, V> groupByKey(int numReducers) {
+        return factory().wrap(underlying().groupByKey(numReducers));
+    }
+
+    /**
+     * Group this table by key to yield a {@link LGroupedTable}
+     */
+    default LGroupedTable<K, V> groupByKey(GroupingOptions opts) {
+        return factory().wrap(underlying().groupByKey(opts));
+    }
+
+    /**
+     * Get an {@link LCollection} containing just the keys from this table
+     */
+    default LCollection<K> keys() {
+        return factory().wrap(underlying().keys());
+    }
+
+    /**
+     * Get an {@link LCollection} containing just the values from this table
+     */
+    default LCollection<V> values() {
+        return factory().wrap(underlying().values());
+    }
+
+    /**
+     * Transform the keys of this table using the given function
+     */
+    default <T> LTable<T, V> mapKeys(SFunction<K, T> fn, PType<T> pType) {
+        return parallelDo(
+                ctx -> ctx.emit(Pair.of(fn.apply(ctx.element().first()), ctx.element().second())),
+                ptf().tableOf(pType, valueType()));
+    }
+
+    /**
+     * Transform the values of this table using the given function
+     */
+    default <T> LTable<K, T> mapValues(SFunction<V, T> fn, PType<T> pType) {
+        return parallelDo(
+                ctx -> ctx.emit(Pair.of(ctx.element().first(), fn.apply(ctx.element().second()))),
+                ptf().tableOf(keyType(), pType));
+    }
+
+    /**
+     * Join this table to another {@link LTable} which has the same key type using the provided {@link JoinType} and
+     * {@link JoinStrategy}
+     */
+    default <U> LTable<K, Pair<V, U>> join(LTable<K, U> other, JoinType joinType, JoinStrategy<K, V, U> joinStrategy) {
+        return factory().wrap(joinStrategy.join(underlying(), other.underlying(), joinType));
+    }
+
+    /**
+     * Join this table to another {@link LTable} which has the same key type using the provide {@link JoinType} and
+     * the {@link DefaultJoinStrategy} (reduce-side join).
+     */
+    default <U> LTable<K, Pair<V, U>> join(LTable<K, U> other, JoinType joinType) {
+        return join(other, joinType, new DefaultJoinStrategy<>());
+    }
+
+    /**
+     * Inner join this table to another {@link LTable} which has the same key type using a reduce-side join
+     */
+    default <U> LTable<K, Pair<V, U>> join(LTable<K, U> other) {
+        return join(other, JoinType.INNER_JOIN);
+    }
+
+    /**
+     * Cogroup this table with another {@link LTable} with the same key type, collecting the set of values from
+     * each side.
+     */
+    default <U> LTable<K, Pair<Collection<V>, Collection<U>>> cogroup(LTable<K, U> other) {
+        return factory().wrap(underlying().cogroup(other.underlying()));
+    }
+
+    /**
+     * Get the underlying {@link PTableType} used to serialize key/value pairs in this table
+     */
+    default PTableType<K, V> pType() { return underlying().getPTableType(); }
+
+    /**
+     * Get a {@link PType} which can be used to serialize the key part of this table
+     */
+    default PType<K> keyType() {
+        return underlying().getKeyType();
+    }
+
+    /**
+     * Get a {@link PType} which can be used to serialize the value part of this table
+     */
+    default PType<V> valueType() {
+        return underlying().getValueType();
+    }
+
+    /**
+     * Write this table to the {@link Target} supplied.
+     */
+    default LTable<K, V> write(Target target) {
+        underlying().write(target);
+        return this;
+    }
+
+    /**
+     * Write this table to the {@link Target} supplied.
+     */
+    default LTable<K, V> write(Target target, Target.WriteMode writeMode) {
+        underlying().write(target, writeMode);
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    default LTable<K, V> increment(Enum<?> counter) {
+        return parallelDo(ctx -> ctx.increment(counter), pType());
+    }
+
+    /** {@inheritDoc} */
+    default LTable<K, V> increment(String counterGroup, String counterName) {
+        return parallelDo(ctx -> ctx.increment(counterGroup, counterName), pType());
+    }
+
+    /** {@inheritDoc} */
+    default LTable<K, V> incrementIf(Enum<?> counter, SPredicate<Pair<K, V>> condition) {
+        return parallelDo(ctx -> {
+            if (condition.test(ctx.element())) ctx.increment(counter);
+        }, pType());
+    }
+
+    /** {@inheritDoc} */
+    default LTable<K, V> incrementIf(String counterGroup, String counterName, SPredicate<Pair<K, V>> condition) {
+        return parallelDo(ctx -> {
+            if (condition.test(ctx.element())) ctx.increment(counterGroup, counterName);
+        }, pType());
+    }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/Lambda.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/Lambda.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/Lambda.java
new file mode 100644
index 0000000..07fad2b
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/Lambda.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+
+/**
+ * Entry point for the crunch-lambda API. Use this to create {@link LCollection}, {@link LTable} and
+ * {@link LGroupedTable} objects from their corresponding {@link PCollection}, {@link PTable} and {@link PGroupedTable}
+ * types.
+ *
+ * <p>The crunch-lambda API allows you to write Crunch pipelines using lambda expressions and method references instead
+ * of creating classes (anonymous, inner, or top level) for each operation that needs to be completed. Many pipelines
+ * are composed of a large number of simple operations, rather than a small number of complex operations, making this
+ * strategy much more efficient to code and easy to read for those able to use Java 8 in their distributed computation
+ * environments.</p>
+ *
+ * <p>You use the API by wrapping your Crunch type into an L-type object. This class provides static methods for that.
+ * You can then use the lambda API methods on the L-type object, yielding more L-type objects. If at any point you need
+ * to go back to the standard Crunch world (for compatibility with existing code or complex use cases), you can at any
+ * time call underlying() on an L-type object to get a Crunch object</p>
+ *
+ * <p>Example (the obligatory wordcount):</p>
+ *
+ * <pre>{@code
+ * Pipeline pipeline = new MRPipeline(getClass());
+ * LCollection<String> inputText = Lambda.wrap(pipeline.readTextFile("/path/to/input/file"));
+ * inputText.flatMap(line -> Arrays.stream(line.split(" ")), Writables.strings())
+ *          .count()
+ *          .map(wordCountPair -> wordCountPair.first() + ": " + wordCountPair.second(), strings())
+ *          .write(To.textFile("/path/to/output/file"));
+ * pipeline.run();
+ * }</pre>
+ *
+ */
+public class Lambda {
+    private static LCollectionFactory INSTANCE = new LCollectionFactoryImpl();
+
+    public static <S> LCollection<S> wrap(PCollection<S> collection) { return INSTANCE.wrap(collection); }
+    public static <K, V> LTable<K, V> wrap(PTable<K, V> collection) { return INSTANCE.wrap(collection); }
+    public static <K, V> LGroupedTable<K, V> wrap(PGroupedTable<K, V> collection) { return INSTANCE.wrap(collection); }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiConsumer.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiConsumer.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiConsumer.java
new file mode 100644
index 0000000..6e5030f
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiConsumer.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda.fn;
+
+import java.io.Serializable;
+import java.util.function.BiConsumer;
+
+/**
+ * Serializable version of the Java BiConsumer functional interface.
+ */
+@FunctionalInterface
+public interface SBiConsumer<K, V> extends BiConsumer<K, V>, Serializable {
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiFunction.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiFunction.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiFunction.java
new file mode 100644
index 0000000..5aac5bc
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiFunction.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda.fn;
+
+import java.io.Serializable;
+import java.util.function.BiFunction;
+
+/**
+ * Serializable version of the Java BiFunction functional interface.
+ */
+@FunctionalInterface
+public interface SBiFunction<K, V, T> extends BiFunction<K, V, T>, Serializable {
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBinaryOperator.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBinaryOperator.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBinaryOperator.java
new file mode 100644
index 0000000..d1e4cc0
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBinaryOperator.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda.fn;
+
+import java.io.Serializable;
+import java.util.function.BinaryOperator;
+
+/**
+ * Serializable version of the Java BinaryOperator functional interface.
+ */
+@FunctionalInterface
+public interface SBinaryOperator<T> extends BinaryOperator<T>, Serializable {
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SConsumer.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SConsumer.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SConsumer.java
new file mode 100644
index 0000000..90f4a99
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SConsumer.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda.fn;
+
+import java.io.Serializable;
+import java.util.function.Consumer;
+
+/**
+ * Serializable version of the Java Consumer functional interface.
+ */
+@FunctionalInterface
+public interface SConsumer<T> extends Consumer<T>, Serializable {
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SFunction.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SFunction.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SFunction.java
new file mode 100644
index 0000000..d120efe
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SFunction.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda.fn;
+
+import java.io.Serializable;
+import java.util.function.Function;
+
+/**
+ * Serializable version of the Java Function functional interface.
+ */
+@FunctionalInterface
+public interface SFunction<S, T> extends Function<S, T>, Serializable {
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SPredicate.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SPredicate.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SPredicate.java
new file mode 100644
index 0000000..9e90bab
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SPredicate.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda.fn;
+
+import java.io.Serializable;
+import java.util.function.Predicate;
+
+/**
+ * Serializable version of the Java Predicate functional interface.
+ */
+@FunctionalInterface
+public interface SPredicate<T> extends Predicate<T>, Serializable {
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SSupplier.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SSupplier.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SSupplier.java
new file mode 100644
index 0000000..eea254a
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SSupplier.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda.fn;
+
+import java.io.Serializable;
+import java.util.function.Supplier;
+
+/**
+ * Serializable version of the Java Supplier functional interface.
+ */
+@FunctionalInterface
+public interface SSupplier<T> extends Supplier<T>, Serializable {
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/package-info.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/package-info.java
new file mode 100644
index 0000000..ad18232
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Serializable versions of the functional interfaces that ship with Java 8
+ */
+package org.apache.crunch.lambda.fn;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/package-info.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/package-info.java
new file mode 100644
index 0000000..9c03148
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/package-info.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <p>Alternative Crunch API using Java 8 features to allow construction of pipelines using lambda functions and method
+ * references. It works by wrapping standards Java {@link org.apache.crunch.PCollection},
+ * {@link org.apache.crunch.PTable} and {@link org.apache.crunch.PGroupedTable} instances into the corresponding
+ * {@link org.apache.crunch.lambda.LCollection}, {@link org.apache.crunch.lambda.LTable} and
+ * {@link org.apache.crunch.lambda.LGroupedTable classes}.</p>
+ *
+ * <p>The static class {@link org.apache.crunch.lambda.Lambda} has methods to create these. Please also see the Javadocs
+ * for {@link org.apache.crunch.lambda.Lambda} for usage examples</p>
+ */
+package org.apache.crunch.lambda;
+

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/test/java/org/apache/crunch/lambda/LCollectionTest.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/test/java/org/apache/crunch/lambda/LCollectionTest.java b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LCollectionTest.java
new file mode 100644
index 0000000..b819d0d
--- /dev/null
+++ b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LCollectionTest.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.crunch.lambda.TestCommon.*;
+import static org.apache.crunch.lambda.TypedRecord.rec;
+import static org.apache.crunch.types.avro.Avros.*;
+import static org.junit.Assert.*;
+
+public class LCollectionTest {
+
+    private LCollection<TypedRecord> lc() {
+        return Lambda.wrap(MemPipeline.typedCollectionOf(Avros.reflects(TypedRecord.class),
+                rec(14, "Alice", 101L),
+                rec(25, "Bo B", 102L),
+                rec(21, "Char Lotte", 103L),
+                rec(28, "David", 104L),
+                rec(31, "Erik", 105L)));
+    }
+
+    @Test
+    public void testParallelDo() throws Exception {
+        LCollection<String> result = lc().parallelDo(ctx -> { if (ctx.element().key > 26) ctx.emit(ctx.element().name); }, strings());
+        assertCollectionOf(result, "David", "Erik");
+    }
+
+    @Test
+    public void testParallelDoPair() throws Exception {
+        LTable<Integer, String> result = lc().parallelDo(ctx -> {
+            if (ctx.element().key > 26) ctx.emit(Pair.of(ctx.element().key, ctx.element().name)); }, tableOf(ints(), strings()));
+        assertCollectionOf(result, Pair.of(28, "David"), Pair.of(31, "Erik"));
+    }
+
+
+    @Test
+    public void testMap() throws Exception {
+        assertCollectionOf(lc().map(r -> r.key, ints()), 14, 25, 21, 28, 31);
+    }
+
+    @Test
+    public void testMapPair() throws Exception {
+        assertCollectionOf(lc().map(r -> Pair.of(r.key, r.value), tableOf(ints(), longs())),
+                Pair.of(14, 101L),
+                Pair.of(25, 102L),
+                Pair.of(21, 103L),
+                Pair.of(28, 104L),
+                Pair.of(31, 105L));
+    }
+
+    @Test
+    public void testFlatMap() throws Exception {
+        assertCollectionOf(
+                lc().flatMap(s -> Arrays.stream(s.name.split(" ")), strings()),
+                "Alice", "Bo", "B", "Char", "Lotte", "David", "Erik");
+    }
+
+
+    @Test
+    public void testFilterMap() throws Exception {
+        Map<String, String> lookupMap = ImmutableMap.of("Erik", "BOOM", "Alice", "POW");
+        assertCollectionOf(
+                lc().filterMap(r -> lookupMap.containsKey(r.name) ? Optional.of(lookupMap.get(r.name)) : Optional.empty(), strings()),
+                "BOOM", "POW"
+        );
+    }
+
+    @Test
+    public void testFilter() throws Exception {
+        assertCollectionOf(lc().filter(r -> r.key == 21), rec(21, "Char Lotte", 103L));
+    }
+
+
+    @Test
+    public void testIncrement() throws Exception {
+        lc().increment("hello", "world");
+        long value = MemPipeline.getCounters().findCounter("hello", "world").getValue();
+        assertEquals(5L, value);
+    }
+
+    @Test
+    public void testIncrementIf() throws Exception {
+        lc().incrementIf("hello", "conditional_world", r -> r.key < 25);
+        long value = MemPipeline.getCounters().findCounter("hello", "conditional_world").getValue();
+        assertEquals(2L, value);
+    }
+
+    @Test
+    public void testBy() throws Exception {
+        assertCollectionOf(
+                lc().filter(r -> r.key == 21).by(r -> r.key, ints()),
+                Pair.of(21, rec(21, "Char Lotte", 103L)));
+    }
+
+    @Test
+    public void testCount() throws Exception {
+        assertCollectionOf(
+                Lambda.wrap(MemPipeline.typedCollectionOf(strings(), "a", "a", "a", "b", "b")).count(),
+                Pair.of("a", 3L),
+                Pair.of("b", 2L)
+        );
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/test/java/org/apache/crunch/lambda/LGroupedTableTest.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/test/java/org/apache/crunch/lambda/LGroupedTableTest.java b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LGroupedTableTest.java
new file mode 100644
index 0000000..043387e
--- /dev/null
+++ b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LGroupedTableTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.crunch.Pair;
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.crunch.lambda.TestCommon.assertCollectionOf;
+import static org.apache.crunch.types.avro.Avros.*;
+
+
+public class LGroupedTableTest {
+
+    LGroupedTable<String, Integer> lgt = Lambda.wrap(MemPipeline.typedTableOf(tableOf(strings(), ints()),
+            "a", 2,
+            "a", 3,
+            "b", 5,
+            "c", 7,
+            "c", 11,
+            "c", 13,
+            "c", 13))
+            .groupByKey();
+
+    @Test
+    public void testCombineValues() throws Exception {
+        assertCollectionOf(lgt.combineValues(Aggregators.MAX_INTS()),
+                Pair.of("a", 3),
+                Pair.of("b", 5),
+                Pair.of("c", 13));
+    }
+
+    @Test
+    public void testCombineValues1() throws Exception {
+        assertCollectionOf(lgt.combineValues(() -> Integer.MIN_VALUE, Integer::max, Collections::singleton),
+                Pair.of("a", 3),
+                Pair.of("b", 5),
+                Pair.of("c", 13));
+    }
+
+    @Test
+    public void testMapValues() throws Exception {
+        assertCollectionOf(lgt.mapValues(vs -> vs.map(i -> i.toString()).reduce((a, b) -> a + "," + b).get(), strings()),
+                Pair.of("a", "2,3"),
+                Pair.of("b", "5"),
+                Pair.of("c", "7,11,13,13"));
+    }
+
+    @Test
+    public void testCollectValues() throws Exception {
+        assertCollectionOf(lgt.collectValues(ArrayList::new, Collection::add, collections(ints())),
+                Pair.of("a", ImmutableList.of(2,3)),
+                Pair.of("b", ImmutableList.of(5)),
+                Pair.of("c", ImmutableList.of(7, 11, 13, 13)));
+    }
+
+    @Test
+    public void testCollectAllValues() throws Exception {
+        assertCollectionOf(lgt.collectAllValues(),
+                Pair.of("a", ImmutableList.of(2,3)),
+                Pair.of("b", ImmutableList.of(5)),
+                Pair.of("c", ImmutableList.of(7, 11, 13, 13)));
+    }
+
+    @Test
+    public void testCollectUniqueValues() throws Exception {
+        assertCollectionOf(lgt.collectUniqueValues(),
+                Pair.of("a", ImmutableSet.of(2, 3)),
+                Pair.of("b", ImmutableSet.of(5)),
+                Pair.of("c", ImmutableSet.of(7, 11, 13)));
+    }
+
+    @Test
+    public void testReduceValues() throws Exception {
+        assertCollectionOf(lgt.reduceValues((a, b) -> a * b),
+                Pair.of("a", 6),
+                Pair.of("b", 5),
+                Pair.of("c", 7 * 11 * 13 * 13)
+                );
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/test/java/org/apache/crunch/lambda/LTableTest.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/test/java/org/apache/crunch/lambda/LTableTest.java b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LTableTest.java
new file mode 100644
index 0000000..f66ada5
--- /dev/null
+++ b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LTableTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.junit.Test;
+
+import static org.apache.crunch.lambda.TestCommon.assertCollectionOf;
+import static org.apache.crunch.types.avro.Avros.*;
+
+
+public class LTableTest {
+
+    private LTable<String, Integer> lt1 = Lambda.wrap(MemPipeline.typedTableOf(tableOf(strings(), ints()),
+            "a", 2,
+            "a", 3,
+            "b", 5,
+            "c", 7,
+            "c", 11,
+            "c", 13,
+            "c", 13));
+
+    private LTable<String, Long> lt2 = Lambda.wrap(MemPipeline.typedTableOf(tableOf(strings(), longs()),
+            "a", 101L,
+            "b", 102L,
+            "c", 103L
+            ));
+
+    @Test
+    public void testKeys() throws Exception {
+        assertCollectionOf(lt1.keys(), "a", "a", "b", "c", "c", "c", "c");
+    }
+
+    @Test
+    public void testValues() throws Exception {
+        assertCollectionOf(lt2.values(), 101L, 102L, 103L);
+    }
+
+    @Test
+    public void testMapKeys() throws Exception {
+        assertCollectionOf(lt2.mapKeys(String::toUpperCase, strings()),
+                Pair.of("A", 101L),
+                Pair.of("B", 102L),
+                Pair.of("C", 103L)
+                );
+    }
+
+    @Test
+    public void testMapValues() throws Exception {
+        assertCollectionOf(lt2.mapValues(v -> v * 2, longs()),
+                Pair.of("a", 202L),
+                Pair.of("b", 204L),
+                Pair.of("c", 206L)
+        );
+    }
+
+    @Test
+    public void testJoin() throws Exception {
+        assertCollectionOf(lt1.join(lt2).values(),
+                Pair.of(2, 101L),
+                Pair.of(3, 101L),
+                Pair.of(5, 102L),
+                Pair.of(7, 103L),
+                Pair.of(11, 103L),
+                Pair.of(13, 103L),
+                Pair.of(13, 103L));
+    }
+
+    @Test
+    public void testCogroup() throws Exception {
+        assertCollectionOf(lt1.cogroup(lt2).values(),
+                Pair.of(ImmutableList.of(2, 3), ImmutableList.of(101L)),
+                Pair.of(ImmutableList.of(5), ImmutableList.of(102L)),
+                Pair.of(ImmutableList.of(7, 11, 13, 13), ImmutableList.of(103L))
+                );
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/test/java/org/apache/crunch/lambda/TestCommon.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/test/java/org/apache/crunch/lambda/TestCommon.java b/crunch-lambda/src/test/java/org/apache/crunch/lambda/TestCommon.java
new file mode 100644
index 0000000..02101ab
--- /dev/null
+++ b/crunch-lambda/src/test/java/org/apache/crunch/lambda/TestCommon.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import com.google.common.collect.Sets;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestCommon {
+    @SafeVarargs
+    public static <T> void assertCollectionOf(LCollection<T> actual, T... expected) {
+        Set<T> actualSet = actual.materialize().collect(Collectors.toSet());
+        Set<T> expectedSet = Sets.newHashSet(expected);
+        assertEquals(expectedSet, actualSet);
+    }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/test/java/org/apache/crunch/lambda/TypedRecord.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/test/java/org/apache/crunch/lambda/TypedRecord.java b/crunch-lambda/src/test/java/org/apache/crunch/lambda/TypedRecord.java
new file mode 100644
index 0000000..42540de
--- /dev/null
+++ b/crunch-lambda/src/test/java/org/apache/crunch/lambda/TypedRecord.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+public class TypedRecord {
+    public int key;
+    public String name;
+    public long value;
+    public static TypedRecord rec(int key, String name, long value) {
+        TypedRecord record = new TypedRecord();
+        record.key = key;
+        record.name = name;
+        record.value = value;
+        return record;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        TypedRecord that = (TypedRecord) o;
+
+        if (key != that.key) return false;
+        if (value != that.value) return false;
+        return name != null ? name.equals(that.name) : that.name == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = key;
+        result = 31 * result + (name != null ? name.hashCode() : 0);
+        result = 31 * result + (int) (value ^ (value >>> 32));
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0758e43..b628a77 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,17 @@ under the License.
     <module>crunch-hive</module>
     <module>crunch-dist</module>
   </modules>
+  <profiles>
+    <profile>
+      <id>java-8</id>
+      <activation>
+        <jdk>[1.8,]</jdk>
+      </activation>
+      <modules>
+        <module>crunch-lambda</module>
+      </modules>
+    </profile>
+  </profiles>
 
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -163,6 +174,12 @@ under the License.
       </dependency>
 
       <dependency>
+        <groupId>org.apache.crunch</groupId>
+        <artifactId>crunch-lambda</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-client</artifactId>
         <version>${hadoop.version}</version>


[2/2] crunch git commit: Java 8 lambda support for Apache Crunch.

Posted by da...@apache.org.
Java 8 lambda support for Apache Crunch.

Remove lambda support from crunch-core, and instead implement a new module called crunch-lambda.
This will allow full use of Java 8 features in implementing support for lambda expressions and
method references, without requiring a dependency on Java 8 for crunch-core. Pthings are wrapped
into analagous Lthings which can be operated on with an API inspired both by the existing Crunch
API and the Java 8 streams API.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/7d7af4ef
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/7d7af4ef
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/7d7af4ef

Branch: refs/heads/master
Commit: 7d7af4ef43b122cc03cee721b9106d174d71d435
Parents: f8920d3
Author: David Whiting <da...@apache.org>
Authored: Sat Jan 2 11:28:34 2016 +0100
Committer: David Whiting <da...@apache.org>
Committed: Sun Jan 10 21:29:49 2016 +0100

----------------------------------------------------------------------
 .../org/apache/crunch/MultiStagePlanningIT.java |   2 +-
 .../it/java/org/apache/crunch/PageRankIT.java   |   2 +-
 .../it/java/org/apache/crunch/WordCountIT.java  |  12 +-
 .../src/main/java/org/apache/crunch/IDoFn.java  |  49 ----
 .../main/java/org/apache/crunch/IFilterFn.java  |  27 --
 .../main/java/org/apache/crunch/IFlatMapFn.java |  28 ---
 .../src/main/java/org/apache/crunch/IMapFn.java |  27 --
 .../java/org/apache/crunch/PCollection.java     |  96 +-------
 .../java/org/apache/crunch/PGroupedTable.java   |  11 -
 .../src/main/java/org/apache/crunch/PTable.java |  24 --
 .../java/org/apache/crunch/fn/IFnHelpers.java   | 149 -----------
 .../impl/dist/collect/BaseGroupedTable.java     |   9 -
 .../impl/dist/collect/PCollectionImpl.java      |  92 +------
 .../crunch/impl/dist/collect/PTableBase.java    |  22 --
 .../crunch/impl/mem/collect/MemCollection.java  |  89 -------
 .../impl/mem/collect/MemGroupedTable.java       |   7 -
 .../crunch/impl/mem/collect/MemTable.java       |  23 --
 crunch-dist/pom.xml                             |   4 +
 crunch-lambda/pom.xml                           |  67 +++++
 .../org/apache/crunch/lambda/LAggregator.java   |  57 +++++
 .../org/apache/crunch/lambda/LCollection.java   | 244 +++++++++++++++++++
 .../crunch/lambda/LCollectionFactory.java       |  44 ++++
 .../crunch/lambda/LCollectionFactoryImpl.java   |  70 ++++++
 .../java/org/apache/crunch/lambda/LDoFn.java    |  31 +++
 .../org/apache/crunch/lambda/LDoFnContext.java  |  52 ++++
 .../org/apache/crunch/lambda/LDoFnWrapper.java  | 106 ++++++++
 .../org/apache/crunch/lambda/LGroupedTable.java | 162 ++++++++++++
 .../java/org/apache/crunch/lambda/LTable.java   | 188 ++++++++++++++
 .../java/org/apache/crunch/lambda/Lambda.java   |  59 +++++
 .../apache/crunch/lambda/fn/SBiConsumer.java    |  28 +++
 .../apache/crunch/lambda/fn/SBiFunction.java    |  28 +++
 .../crunch/lambda/fn/SBinaryOperator.java       |  28 +++
 .../org/apache/crunch/lambda/fn/SConsumer.java  |  28 +++
 .../org/apache/crunch/lambda/fn/SFunction.java  |  28 +++
 .../org/apache/crunch/lambda/fn/SPredicate.java |  28 +++
 .../org/apache/crunch/lambda/fn/SSupplier.java  |  28 +++
 .../apache/crunch/lambda/fn/package-info.java   |  22 ++
 .../org/apache/crunch/lambda/package-info.java  |  30 +++
 .../apache/crunch/lambda/LCollectionTest.java   | 128 ++++++++++
 .../apache/crunch/lambda/LGroupedTableTest.java | 103 ++++++++
 .../org/apache/crunch/lambda/LTableTest.java    |  94 +++++++
 .../org/apache/crunch/lambda/TestCommon.java    |  34 +++
 .../org/apache/crunch/lambda/TypedRecord.java   |  52 ++++
 pom.xml                                         |  17 ++
 44 files changed, 1771 insertions(+), 658 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java b/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java
index 38211a7..a7b7d48 100644
--- a/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java
@@ -60,7 +60,7 @@ public class MultiStagePlanningIT implements Serializable {
 
         PTable<String, String> addressesTable = pipeline.readTextFile(addressesFile)
                 .parallelDo("Split addresses", new StringToPairMapFn(), tableOf(strings(), strings()))
-                .filter(new IFilterFn<Pair<String, String>>() {
+                .filter(new FilterFn<Pair<String, String>>() {
                     @Override
                     public boolean accept(Pair<String, String> input) {
                         // This is odd but it is the simpler way of simulating this would take longer than

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
index b30465d..701f78a 100644
--- a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
@@ -130,7 +130,7 @@ public class PageRankIT {
     }, ptf.tableOf(ptf.strings(), ptf.floats()));
 
     return input.cogroup(outbound).mapValues(
-        new IMapFn<Pair<Collection<PageRankData>, Collection<Float>>, PageRankData>() {
+        new MapFn<Pair<Collection<PageRankData>, Collection<Float>>, PageRankData>() {
           @Override
           public PageRankData map(Pair<Collection<PageRankData>, Collection<Float>> input) {
             PageRankData prd = Iterables.getOnlyElement(input.first());

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java b/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
index 4c77c41..e0bd719 100644
--- a/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
@@ -39,6 +39,7 @@ import org.apache.crunch.types.PTypes;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
 import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -55,17 +56,18 @@ public class WordCountIT {
   }
 
   public static PTable<String, Long> wordCount(PCollection<String> words, PTypeFamily typeFamily) {
-    return Aggregate.count(words.parallelDo(new IDoFn<String, String>() {
+    return Aggregate.count(words.parallelDo(new DoFn<String, String>() {
       @Override
-      public void process(Context<String, String> context) {
-        List<String> words = Arrays.asList(context.element().split("\\s+"));
+      public void process(String input, Emitter<String> emitter) {
+        List<String> words = Arrays.asList(input.split("\\s+"));
         for (String word : words) {
           if ("and".equals(word)) {
-            context.increment(WordCountStats.ANDS);
+            increment(WordCountStats.ANDS);
           }
-          context.emit(word);
+          emitter.emit(word);
         }
       }
+
     }, typeFamily.strings()));
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/IDoFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/IDoFn.java b/crunch-core/src/main/java/org/apache/crunch/IDoFn.java
deleted file mode 100644
index b393f43..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/IDoFn.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-import java.io.Serializable;
-
-/**
- * A Java lambdas friendly version of the {@link DoFn} class.
- */
-public interface IDoFn<S, T> extends Serializable {
-
-  void process(Context<S, T> context);
-
-  public interface Context<S, T> {
-    S element();
-
-    void emit(T t);
-
-    TaskInputOutputContext getContext();
-
-    Configuration getConfiguration();
-
-    void increment(String groupName, String counterName);
-
-    void increment(String groupName, String counterName, long value);
-
-    void increment(Enum<?> counterName);
-
-    void increment(Enum<?> counterName, long value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java b/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java
deleted file mode 100644
index bb8a03d..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.io.Serializable;
-
-/**
- * A Java lambdas friendly version of the {@link org.apache.crunch.FilterFn} class.
- */
-public interface IFilterFn<S> extends Serializable {
-  boolean accept(S input);
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java b/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java
deleted file mode 100644
index a2b85c4..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.io.Serializable;
-
-/**
- * A Java lambdas friendly interface for writing business logic against {@code PCollection}s
- * that take in a single input record and return 0 to N output records via an {@code Iterable}.
- */
-public interface IFlatMapFn<S, T> extends Serializable {
-  Iterable<T> process(S input);
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/IMapFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/IMapFn.java b/crunch-core/src/main/java/org/apache/crunch/IMapFn.java
deleted file mode 100644
index 3c06d9e..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/IMapFn.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.io.Serializable;
-
-/**
- * A Java lambdas friendly version of the {@link org.apache.crunch.MapFn} class.
- */
-public interface IMapFn<S, T> extends Serializable {
-  public T map(S input);
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/PCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PCollection.java b/crunch-core/src/main/java/org/apache/crunch/PCollection.java
index 5d072e6..8043349 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PCollection.java
@@ -132,95 +132,6 @@ public interface PCollection<S> {
   <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type,
       ParallelDoOptions options);
 
-  /**
-   * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
-   */
-  <T> PCollection<T> parallelDo(IDoFn<S, T> fn, PType<T> type);
-
-  /**
-   * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
-   */
-  <K, V> PTable<K, V> parallelDo(IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type);
-
-  /**
-   * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
-   */
-  <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type);
-
-  /**
-   * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
-   */
-  <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type);
-
-  /**
-   * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
-   */
-  <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type, ParallelDoOptions options);
-
-  /**
-   * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
-   */
-  <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type, ParallelDoOptions options);
-
-  /**
-   * For each element of this {@code PCollection}, generate 0 to N output values using the
-   * given {@code IFlatMapFn}. Designed for Java lambdas.
-   */
-  <T> PCollection<T> flatMap(IFlatMapFn<S, T> fn, PType<T> type);
-
-  /**
-   * For each element of this {@code PCollection}, generate 0 to N output values using the
-   * given {@code IFlatMapFn}. Designed for Java lambdas.
-   */
-  <K, V> PTable<K, V> flatMap(IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type);
-
-  /**
-   * For each element of this {@code PCollection}, generate 0 to N output values using the
-   * given {@code IFlatMapFn}. Designed for Java lambdas.
-   */
-  <T> PCollection<T> flatMap(String name, IFlatMapFn<S, T> fn, PType<T> type);
-
-  /**
-   * For each element of this {@code PCollection}, generate 0 to N output values using the
-   * given {@code IFlatMapFn}. Designed for Java lambdas.
-   */
-  <K, V> PTable<K, V> flatMap(String name, IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type);
-
-  /**
-   * For each element of this {@code PCollection}, generate one output value using the
-   * given {@code IMapFn}. Designed for Java lambdas.
-   */
-  <T> PCollection<T> map(IMapFn<S, T> fn, PType<T> type);
-
-  /**
-   * For each element of this {@code PCollection}, generate one output value using the
-   * given {@code IMapFn}. Designed for Java lambdas.
-   */
-  <K, V> PTable<K, V> map(IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type);
-
-  /**
-   * For each element of this {@code PCollection}, generate one output value using the
-   * given {@code IMapFn}. Designed for Java lambdas.
-   */
-  <T> PCollection<T> map(String name, IMapFn<S, T> fn, PType<T> type);
-
-  /**
-   * For each element of this {@code PCollection}, generate one output value using the
-   * given {@code IMapFn}. Designed for Java lambdas.
-   */
-  <K, V> PTable<K, V> map(String name, IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type);
-
-  /**
-   * Filter elements of this {@code PCollection} using the given {@code IFilterFn}.
-   * Designed for Java lambdas.
-   */
-  PCollection<S> filter(IFilterFn<S> fn);
-
-  /**
-   * Filter elements of this {@code PCollection} using the given {@code IFilterFn}.
-   * Designed for Java lambdas.
-   */
-  PCollection<S> filter(String name, IFilterFn<S> fn);
 
   /**
    * Write the contents of this {@code PCollection} to the given {@code Target},
@@ -349,12 +260,6 @@ public interface PCollection<S> {
   <K> PTable<K, S> by(MapFn<S, K> extractKeyFn, PType<K> keyType);
 
   /**
-   * Apply the given {@code IMapFn} to each element of this instance in order to
-   * create a {@code PTable}. Designed for use with Java 8 lambdas.
-   */
-  <K> PTable<K, S> by(IMapFn<S, K> extractKeyFn, PType<K> keyType);
-
-  /**
    * Apply the given map function to each element of this instance in order to
    * create a {@code PTable}.
    *
@@ -385,4 +290,5 @@ public interface PCollection<S> {
    * Returns a {@code PCollection} that contains the result of aggregating all values in this instance.
    */
   PCollection<S> aggregate(Aggregator<S> aggregator);
+  
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
index 6ac86de..756855c 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
@@ -79,17 +79,6 @@ public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> {
    */
   <U> PTable<K, U> mapValues(MapFn<Iterable<V>, U> mapFn, PType<U> ptype);
 
-
-  /**
-   * Maps the {@code Iterable<V>} elements of each record to a new type. Just like
-   * any {@code parallelDo} operation on a {@code PGroupedTable}, this may only be
-   * called once. Designed for Java lambdas
-   * @param mapFn The mapping function (can be lambda/method ref)
-   * @param ptype The serialization infromation for the returned data
-   * @return A new {@code PTable} instance
-   */
-  <U> PTable<K, U> mapValues(IMapFn<Iterable<V>, U> mapFn, PType<U> ptype);
-  
   /**
    * Maps the {@code Iterable<V>} elements of each record to a new type. Just like
    * any {@code parallelDo} operation on a {@code PGroupedTable}, this may only be

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/PTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PTable.java b/crunch-core/src/main/java/org/apache/crunch/PTable.java
index 5609c3f..74cade8 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PTable.java
@@ -107,12 +107,6 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> {
 
   /**
    * Returns a {@code PTable} that has the same keys as this instance, but
-   * uses the given function to map the values. Designed for Java lambdas.
-   */
-  <U> PTable<K, U> mapValues(IMapFn<V, U> mapFn, PType<U> ptype);
-
-  /**
-   * Returns a {@code PTable} that has the same keys as this instance, but
    * uses the given function to map the values.
    */
   <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype);
@@ -125,12 +119,6 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> {
 
   /**
    * Returns a {@code PTable} that has the same values as this instance, but
-   * uses the given function to map the keys. Designed for Java lambdas.
-   */
-  <K2> PTable<K2, V> mapKeys(IMapFn<K, K2> mapFn, PType<K2> ptype);
-
-  /**
-   * Returns a {@code PTable} that has the same values as this instance, but
    * uses the given function to map the keys.
    */
   <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype);
@@ -149,12 +137,6 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> {
 
   /**
    * Apply the given filter function to this instance and return the resulting
-   * {@code PTable}. Designed for Java lambdas.
-   */
-  PTable<K, V> filter(IFilterFn<Pair<K, V>> fn);
-
-  /**
-   * Apply the given filter function to this instance and return the resulting
    * {@code PTable}.
    *
    * @param name
@@ -165,12 +147,6 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> {
   PTable<K, V> filter(String name, FilterFn<Pair<K, V>> filterFn);
 
   /**
-   * Apply the given filter function to this instance and return the resulting
-   * {@code PTable}. Designed for Java lambdas.
-   */
-  PTable<K, V> filter(String name, IFilterFn<Pair<K, V>> fn);
-
-  /**
    * Returns a PTable made up of the pairs in this PTable with the largest value
    * field.
    *

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java b/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java
deleted file mode 100644
index 8560fab..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.FilterFn;
-import org.apache.crunch.IDoFn;
-import org.apache.crunch.IFilterFn;
-import org.apache.crunch.IFlatMapFn;
-import org.apache.crunch.IMapFn;
-import org.apache.crunch.MapFn;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-public class IFnHelpers {
-
-  public static <S, T> DoFn<S, T> wrap(final org.apache.crunch.IDoFn<S, T> fn) {
-    return new IDoFnWrapper(fn);
-  }
-
-  public static <S, T> DoFn<S, T> wrapFlatMap(final IFlatMapFn<S, T> fn) {
-    return new DoFn<S, T>() {
-      @Override
-      public void process(S input, Emitter<T> emitter) {
-        for (T t : fn.process(input)) {
-          emitter.emit(t);
-        }
-      }
-    };
-  }
-
-  public static <S, T> MapFn<S, T> wrapMap(final IMapFn<S, T> fn) {
-    return new MapFn<S, T>() {
-      @Override
-      public T map(S input) {
-        return fn.map(input);
-      }
-    };
-  }
-
-  public static <S> FilterFn<S> wrapFilter(final IFilterFn<S> fn) {
-    return new FilterFn<S>() {
-      @Override
-      public boolean accept(S input) {
-        return fn.accept(input);
-      }
-    };
-  }
-
-  static class IDoFnWrapper<S, T> extends DoFn<S, T> {
-
-    private final org.apache.crunch.IDoFn<S, T> fn;
-    private transient ContextImpl<S, T> ctxt;
-
-    public IDoFnWrapper(org.apache.crunch.IDoFn<S, T> fn) {
-      this.fn = fn;
-    }
-
-    @Override
-    public void initialize() {
-      super.initialize();
-      if (getContext() == null) {
-        this.ctxt = new ContextImpl<S, T>(getConfiguration());
-      } else {
-        this.ctxt = new ContextImpl<S, T>(getContext());
-      }
-    }
-
-    @Override
-    public void process(S input, Emitter<T> emitter) {
-      fn.process(ctxt.update(input, emitter));
-    }
-  }
-
-  static class ContextImpl<S, T> implements IDoFn.Context<S, T> {
-    private S element;
-    private Emitter<T> emitter;
-    private TaskInputOutputContext context;
-    private Configuration conf;
-
-    public ContextImpl(TaskInputOutputContext context) {
-      this.context = context;
-      this.conf = context.getConfiguration();
-    }
-
-    public ContextImpl(Configuration conf) {
-      this.context = null;
-      this.conf = conf;
-    }
-
-    public ContextImpl update(S element, Emitter<T> emitter) {
-      this.element = element;
-      this.emitter = emitter;
-      return this;
-    }
-
-    public S element() {
-      return element;
-    }
-
-    public void emit(T t) {
-      emitter.emit(t);
-    }
-
-    public TaskInputOutputContext getContext() {
-      return context;
-    }
-
-    public Configuration getConfiguration() {
-      return conf;
-    }
-
-    public void increment(String groupName, String counterName) {
-      increment(groupName, counterName, 1);
-    }
-
-    public void increment(String groupName, String counterName, long value) {
-      if (context != null) {
-        context.getCounter(groupName, counterName).increment(value);
-      }
-    }
-
-    public void increment(Enum<?> counterName) {
-      increment(counterName, 1);
-    }
-
-    public void increment(Enum<?> counterName, long value) {
-      if (context != null) {
-        context.getCounter(counterName).increment(value);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
index eb2d829..7bfacdf 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
@@ -25,16 +25,13 @@ import org.apache.crunch.CombineFn;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.IMapFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PGroupedTable;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.ReadableData;
-import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
 import org.apache.crunch.fn.Aggregators;
-import org.apache.crunch.fn.IFnHelpers;
 import org.apache.crunch.lib.PTables;
 import org.apache.crunch.types.PGroupedTableType;
 import org.apache.crunch.types.PType;
@@ -121,12 +118,6 @@ public class BaseGroupedTable<K, V> extends PCollectionImpl<Pair<K, Iterable<V>>
   }
 
   @Override
-  public <U> PTable<K, U> mapValues(IMapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
-    return PTables.mapValues(this, IFnHelpers.wrapMap(mapFn), ptype);
-  }
-
-
-  @Override
   public <U> PTable<K, U> mapValues(String name, MapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
     return PTables.mapValues(name, this, mapFn, ptype);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
index 2a5e1f5..7650ff5 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
@@ -25,10 +25,6 @@ import org.apache.crunch.CachingOptions;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.FilterFn;
-import org.apache.crunch.IFilterFn;
-import org.apache.crunch.IFlatMapFn;
-import org.apache.crunch.IDoFn;
-import org.apache.crunch.IMapFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PObject;
@@ -40,7 +36,6 @@ import org.apache.crunch.ReadableData;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
 import org.apache.crunch.fn.ExtractKeyFn;
-import org.apache.crunch.fn.IFnHelpers;
 import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.impl.dist.DistributedPipeline;
 import org.apache.crunch.io.ReadableSource;
@@ -145,7 +140,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
   }
 
   @Override
-  public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type) {
+  public <T> PCollection<T>  parallelDo(String name, DoFn<S, T> fn, PType<T> type) {
     return parallelDo(name, fn, type, ParallelDoOptions.builder().build());
   }
 
@@ -171,86 +166,6 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
     return pipeline.getFactory().createDoTable(name, getChainingCollection(), fn, type, options);
   }
 
-  @Override
-  public <T> PCollection<T> parallelDo(IDoFn<S, T> fn, PType<T> type) {
-    return parallelDo(IFnHelpers.wrap(fn), type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> parallelDo(IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(IFnHelpers.wrap(fn), type);
-  }
-
-  @Override
-  public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type) {
-    return parallelDo(name, IFnHelpers.wrap(fn), type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(name, IFnHelpers.wrap(fn), type);
-  }
-
-  @Override
-  public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type, ParallelDoOptions options) {
-    return parallelDo(name, IFnHelpers.wrap(fn), type, options);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type, ParallelDoOptions options) {
-    return parallelDo(name, IFnHelpers.wrap(fn), type, options);
-  }
-
-  @Override
-  public <T> PCollection<T> flatMap(IFlatMapFn<S, T> fn, PType<T> type) {
-    return parallelDo(IFnHelpers.wrapFlatMap(fn), type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> flatMap(IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(IFnHelpers.wrapFlatMap(fn), type);
-  }
-
-  @Override
-  public <T> PCollection<T> flatMap(String name, IFlatMapFn<S, T> fn, PType<T> type) {
-    return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> flatMap(String name, IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type);
-  }
-
-  @Override
-  public <T> PCollection<T> map(IMapFn<S, T> fn, PType<T> type) {
-    return parallelDo(IFnHelpers.wrapMap(fn), type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> map(IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(IFnHelpers.wrapMap(fn), type);
-  }
-
-  @Override
-  public <T> PCollection<T> map(String name, IMapFn<S, T> fn, PType<T> type) {
-    return parallelDo(name, IFnHelpers.wrapMap(fn), type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> map(String name, IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(name, IFnHelpers.wrapMap(fn), type);
-  }
-
-  @Override
-  public PCollection<S> filter(IFilterFn<S> fn) {
-    return filter(IFnHelpers.wrapFilter(fn));
-  }
-
-  @Override
-  public PCollection<S> filter(String name, IFilterFn<S> fn) {
-    return filter(name, IFnHelpers.wrapFilter(fn));
-  }
-
   public PCollection<S> write(Target target) {
     if (materializedAt != null) {
       getPipeline().write(
@@ -355,11 +270,6 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
   }
 
   @Override
-  public <K> PTable<K, S> by(IMapFn<S, K> mapFn, PType<K> keyType) {
-    return parallelDo(new ExtractKeyFn<K, S>(IFnHelpers.wrapMap(mapFn)), getTypeFamily().tableOf(keyType, getPType()));
-  }
-
-  @Override
   public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K> keyType) {
     return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
index 6bc3a41..4ba4d49 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
@@ -21,8 +21,6 @@ import com.google.common.collect.Lists;
 import org.apache.crunch.CachingOptions;
 import org.apache.crunch.FilterFn;
 import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.IFilterFn;
-import org.apache.crunch.IMapFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PObject;
@@ -31,7 +29,6 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.Target;
-import org.apache.crunch.fn.IFnHelpers;
 import org.apache.crunch.impl.dist.DistributedPipeline;
 import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.lib.Cogroup;
@@ -135,24 +132,10 @@ public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> imple
   }
 
   @Override
-  public PTable<K, V> filter(IFilterFn<Pair<K, V>> filterFn) {
-    return parallelDo(IFnHelpers.wrapFilter(filterFn), getPTableType());
-  }
-
-  @Override
-  public PTable<K, V> filter(String name, IFilterFn<Pair<K, V>> filterFn) {
-    return parallelDo(name, IFnHelpers.wrapFilter(filterFn), getPTableType());
-  }
-
-  @Override
   public <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype) {
     return PTables.mapValues(this, mapFn, ptype);
   }
 
-  @Override
-  public <U> PTable<K, U> mapValues(IMapFn<V, U> mapFn, PType<U> ptype) {
-    return PTables.mapValues(this, IFnHelpers.wrapMap(mapFn), ptype);
-  }
 
   @Override
   public <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype) {
@@ -165,11 +148,6 @@ public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> imple
   }
 
   @Override
-  public <K2> PTable<K2, V> mapKeys(IMapFn<K, K2> mapFn, PType<K2> ptype) {
-    return PTables.mapKeys(this, IFnHelpers.wrapMap(mapFn), ptype);
-  }
-
-  @Override
   public <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype) {
     return PTables.mapKeys(name, this, mapFn, ptype);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index 89671a3..087a31d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -21,7 +21,6 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
-import java.io.ObjectStreamClass;
 import java.lang.reflect.Method;
 import java.util.Collection;
 import java.util.Set;
@@ -36,10 +35,6 @@ import org.apache.crunch.Aggregator;
 import org.apache.crunch.CachingOptions;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.FilterFn;
-import org.apache.crunch.IFilterFn;
-import org.apache.crunch.IFlatMapFn;
-import org.apache.crunch.IDoFn;
-import org.apache.crunch.IMapFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PObject;
@@ -51,7 +46,6 @@ import org.apache.crunch.ReadableData;
 import org.apache.crunch.PipelineCallable;
 import org.apache.crunch.Target;
 import org.apache.crunch.fn.ExtractKeyFn;
-import org.apache.crunch.fn.IFnHelpers;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mem.emit.InMemoryEmitter;
 import org.apache.crunch.lib.Aggregate;
@@ -205,84 +199,6 @@ public class MemCollection<S> implements PCollection<S> {
   }
 
   @Override
-  public <T> PCollection<T> parallelDo(IDoFn<S, T> fn, PType<T> type) {
-    return parallelDo(null, fn, type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> parallelDo(IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(null, fn, type);
-  }
-
-  @Override
-  public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type) {
-    return parallelDo(name, fn, type, null);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(name, fn, type, null);
-  }
-
-  @Override
-  public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type, ParallelDoOptions options) {
-    return parallelDo(name, IFnHelpers.wrap(fn), type, options);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type, ParallelDoOptions options) {
-    return parallelDo(name, IFnHelpers.wrap(fn), type, options);
-  }
-
-  @Override
-  public <T> PCollection<T> flatMap(IFlatMapFn<S, T> fn, PType<T> type) {
-    return parallelDo(IFnHelpers.wrapFlatMap(fn), type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> flatMap(IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(IFnHelpers.wrapFlatMap(fn), type);
-  }
-
-  @Override
-  public <T> PCollection<T> flatMap(String name, IFlatMapFn<S, T> fn, PType<T> type) {
-    return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> flatMap(String name, IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type);
-  }
-
-  @Override
-  public <T> PCollection<T> map(IMapFn<S, T> fn, PType<T> type) {
-    return parallelDo(IFnHelpers.wrapMap(fn), type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> map(IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(IFnHelpers.wrapMap(fn), type);
-  }
-
-  @Override
-  public <T> PCollection<T> map(String name, IMapFn<S, T> fn, PType<T> type) {
-    return parallelDo(name, IFnHelpers.wrapMap(fn), type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> map(String name, IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(name, IFnHelpers.wrapMap(fn), type);
-  }
-
-  public PCollection<S> filter(IFilterFn<S> fn) {
-    return filter(IFnHelpers.wrapFilter(fn));
-  }
-
-  public PCollection<S> filter(String name, IFilterFn<S> fn) {
-    return filter(name, IFnHelpers.wrapFilter(fn));
-  }
-
-  @Override
   public PCollection<S> write(Target target) {
     getPipeline().write(this, target);
     return this;
@@ -404,11 +320,6 @@ public class MemCollection<S> implements PCollection<S> {
   }
 
   @Override
-  public <K> PTable<K, S> by(IMapFn<S, K> mapFn, PType<K> keyType) {
-    return parallelDo(new ExtractKeyFn<K, S>(IFnHelpers.wrapMap(mapFn)), getTypeFamily().tableOf(keyType, getPType()));
-  }
-
-  @Override
   public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K> keyType) {
     return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
index 5451533..e8bf5e6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
@@ -22,7 +22,6 @@ import org.apache.crunch.CombineFn;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.IMapFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PGroupedTable;
@@ -30,7 +29,6 @@ import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Target;
 import org.apache.crunch.fn.Aggregators;
-import org.apache.crunch.fn.IFnHelpers;
 import org.apache.crunch.lib.PTables;
 import org.apache.crunch.types.PGroupedTableType;
 import org.apache.crunch.types.PTableType;
@@ -125,11 +123,6 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implemen
   }
 
   @Override
-  public <U> PTable<K, U> mapValues(IMapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
-    return PTables.mapValues(this, IFnHelpers.wrapMap(mapFn), ptype);
-  }
-
-  @Override
   public <U> PTable<K, U> mapValues(String name, MapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
     return PTables.mapValues(name, this, mapFn, ptype);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
index 03b5a70..b90b656 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
@@ -24,8 +24,6 @@ import com.google.common.collect.ImmutableList;
 import org.apache.crunch.CachingOptions;
 import org.apache.crunch.FilterFn;
 import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.IFilterFn;
-import org.apache.crunch.IMapFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PGroupedTable;
@@ -33,7 +31,6 @@ import org.apache.crunch.PObject;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Target;
-import org.apache.crunch.fn.IFnHelpers;
 import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.lib.Cogroup;
 import org.apache.crunch.lib.Join;
@@ -138,26 +135,11 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable<
   }
 
   @Override
-  public PTable<K, V> filter(IFilterFn<Pair<K, V>> filterFn) {
-    return parallelDo(IFnHelpers.wrapFilter(filterFn), getPTableType());
-  }
-
-  @Override
-  public PTable<K, V> filter(String name, IFilterFn<Pair<K, V>> filterFn) {
-    return parallelDo(name, IFnHelpers.wrapFilter(filterFn), getPTableType());
-  }
-
-  @Override
   public <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype) {
     return PTables.mapValues(this, mapFn, ptype);
   }
 
   @Override
-  public <U> PTable<K, U> mapValues(IMapFn<V, U> mapFn, PType<U> ptype) {
-    return PTables.mapValues(this, IFnHelpers.wrapMap(mapFn), ptype);
-  }
-
-  @Override
   public <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype) {
     return PTables.mapValues(name, this, mapFn, ptype);
   }
@@ -168,11 +150,6 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable<
   }
 
   @Override
-  public <K2> PTable<K2, V> mapKeys(IMapFn<K, K2> mapFn, PType<K2> ptype) {
-    return PTables.mapKeys(this, IFnHelpers.wrapMap(mapFn), ptype);
-  }
-
-  @Override
   public <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype) {
     return PTables.mapKeys(name, this, mapFn, ptype);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-dist/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-dist/pom.xml b/crunch-dist/pom.xml
index b7cd0e9..48d9b05 100644
--- a/crunch-dist/pom.xml
+++ b/crunch-dist/pom.xml
@@ -61,6 +61,10 @@ under the License.
       <groupId>org.apache.crunch</groupId>
       <artifactId>crunch-contrib</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.crunch</groupId>
+      <artifactId>crunch-lambda</artifactId>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-lambda/pom.xml b/crunch-lambda/pom.xml
new file mode 100644
index 0000000..e910517
--- /dev/null
+++ b/crunch-lambda/pom.xml
@@ -0,0 +1,67 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.crunch</groupId>
+        <artifactId>crunch-parent</artifactId>
+        <version>0.14.0-SNAPSHOT</version>
+    </parent>
+    <properties>
+        <java.source.version>1.8</java.source.version>
+        <java.target.version>1.8</java.target.version>
+    </properties>
+
+    <artifactId>crunch-lambda</artifactId>
+    <name>Apache Crunch Lambda</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.crunch</groupId>
+            <artifactId>crunch-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.crunch</groupId>
+            <artifactId>crunch-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LAggregator.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LAggregator.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LAggregator.java
new file mode 100644
index 0000000..5b8611d
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LAggregator.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.lambda.fn.SBiFunction;
+import org.apache.crunch.lambda.fn.SFunction;
+import org.apache.crunch.lambda.fn.SSupplier;
+
+/**
+ * Crunch Aggregator expressed as a composition of functional interface implementations
+ * @param <V> Type of values to be aggregated
+ * @param <A> Type of object which stores objects as they are being aggregated
+ */
+public class LAggregator<V, A> extends Aggregators.SimpleAggregator<V> {
+
+    private final SSupplier<A> initialSupplier;
+    private final SBiFunction<A, V, A> combineFn;
+    private final SFunction<A, Iterable<V>> outputFn;
+    private A a;
+
+    public LAggregator(SSupplier<A> initialSupplier, SBiFunction<A, V, A> combineFn, SFunction<A, Iterable<V>> outputFn) {
+        this.initialSupplier = initialSupplier;
+        this.combineFn = combineFn;
+        this.outputFn = outputFn;
+    }
+
+    @Override
+    public void reset() {
+        a = initialSupplier.get();
+    }
+
+    @Override
+    public void update(V v) {
+        a = combineFn.apply(a, v);
+    }
+
+    @Override
+    public Iterable<V> results() {
+        return outputFn.apply(a);
+    }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java
new file mode 100644
index 0000000..6a8dd62
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.*;
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.lambda.fn.SFunction;
+import org.apache.crunch.lambda.fn.SPredicate;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+import java.util.Optional;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * Java 8 friendly version of the {@link PCollection} interface, allowing distributed operations to be expressed in
+ * terms of lambda expressions and method references, instead of creating a new class implementation for each operation.
+ * @param <S> The type of the elements in this collection
+ */
+public interface LCollection<S> {
+    /**
+     * Get the underlying {@link PCollection} for this LCollection
+     */
+    PCollection<S> underlying();
+
+    /**
+     * Get the {@link LCollectionFactory} which can be used to create new Ltype instances
+     */
+    LCollectionFactory factory();
+
+    /**
+     * Transform this LCollection using a standard Crunch {@link DoFn}
+     */
+    default <T> LCollection<T> parallelDo(DoFn<S, T> fn, PType<T> pType) {
+        return factory().wrap(underlying().parallelDo(fn, pType));
+    }
+
+    /**
+     * Transform this LCollection to an {@link LTable} using a standard Crunch {@link DoFn}
+     */
+    default <K, V> LTable<K, V> parallelDo(DoFn<S, Pair<K, V>> fn, PTableType<K, V> pType) {
+        return factory().wrap(underlying().parallelDo(fn, pType));
+    }
+
+    /**
+     * Transform this LCollection using a Lambda-friendly {@link LDoFn}.
+     */
+    default <T> LCollection<T> parallelDo(LDoFn<S, T> fn, PType<T> pType) {
+        return parallelDo(new LDoFnWrapper<>(fn), pType);
+    }
+
+    /**
+     * Transform this LCollection using a Lambda-friendly {@link LDoFn}.
+     */
+    default <K, V> LTable<K, V> parallelDo(LDoFn<S, Pair<K, V>> fn, PTableType<K, V> pType) {
+        return parallelDo(new LDoFnWrapper<>(fn), pType);
+    }
+
+    /**
+     * Map the elements of this collection 1-1 through the supplied function.
+     */
+    default <T> LCollection<T> map(SFunction<S, T> fn, PType<T> pType) {
+        return parallelDo(ctx -> ctx.emit(fn.apply(ctx.element())), pType);
+    }
+
+    /**
+     * Map the elements of this collection 1-1 through the supplied function to yield an {@link LTable}
+     */
+    default <K, V> LTable<K, V> map(SFunction<S, Pair<K, V>> fn, PTableType<K, V> pType) {
+        return parallelDo(ctx -> ctx.emit(fn.apply(ctx.element())), pType);
+    }
+
+    /**
+     * Map each element to zero or more output elements using the provided stream-returning function.
+     */
+    default <T> LCollection<T> flatMap(SFunction<S, Stream<T>> fn, PType<T> pType) {
+        return parallelDo(ctx -> fn.apply(ctx.element()).forEach(ctx::emit), pType);
+    }
+
+    /**
+     * Map each element to zero or more output elements using the provided stream-returning function to yield an
+     * {@link LTable}
+     */
+    default <K, V> LTable<K, V> flatMap(SFunction<S, Stream<Pair<K, V>>> fn, PTableType<K, V> pType) {
+        return parallelDo(ctx -> fn.apply(ctx.element()).forEach(ctx::emit), pType);
+    }
+
+    /**
+     * Combination of a filter and map operation by using a function with {@link Optional} return type.
+     */
+    default <T> LCollection<T> filterMap(SFunction<S, Optional<T>> fn, PType<T> pType) {
+        return parallelDo(ctx -> fn.apply(ctx.element()).ifPresent(ctx::emit), pType);
+    }
+
+    /**
+     * Combination of a filter and map operation by using a function with {@link Optional} return type.
+     */
+    default <K, V> LTable<K, V> filterMap(SFunction<S, Optional<Pair<K, V>>> fn, PTableType<K, V> pType) {
+        return parallelDo(ctx -> fn.apply(ctx.element()).ifPresent(ctx::emit), pType);
+    }
+
+    /**
+     * Filter the collection using the supplied predicate.
+     */
+    default LCollection<S> filter(SPredicate<S> predicate) {
+        return parallelDo(ctx -> { if (predicate.test(ctx.element())) ctx.emit(ctx.element());}, pType());
+    }
+
+    /**
+     * Union this LCollection with another LCollection of the same type
+     */
+    default LCollection<S> union(LCollection<S> other) {
+        return factory().wrap(underlying().union(other.underlying()));
+    }
+
+    /**
+     * Union this LCollection with a {@link PCollection} of the same type
+     */
+    default LCollection<S> union(PCollection<S> other) {
+        return factory().wrap(underlying().union(other));
+    }
+
+    /**
+     * Increment a counter for every element in the collection
+     */
+    default LCollection<S> increment(Enum<?> counter) {
+        return parallelDo(ctx -> ctx.increment(counter), pType());
+    }
+
+    /**
+     * Increment a counter for every element in the collection
+     */
+    default LCollection<S> increment(String counterGroup, String counterName) {
+        return parallelDo(ctx -> ctx.increment(counterGroup, counterName), pType());
+    }
+
+    /**
+     * Increment a counter for every element satisfying the conditional predicate supplied.
+     */
+    default LCollection<S> incrementIf(Enum<?> counter, SPredicate<S> condition) {
+        return parallelDo(ctx -> {
+            if (condition.test(ctx.element())) ctx.increment(counter);
+        }, pType());
+    }
+
+    /**
+     * Increment a counter for every element satisfying the conditional predicate supplied.
+     */
+    default LCollection<S> incrementIf(String counterGroup, String counterName, SPredicate<S> condition) {
+        return parallelDo(ctx -> {
+            if (condition.test(ctx.element())) ctx.increment(counterGroup, counterName);
+        }, pType());
+    }
+
+    /**
+     * Cache the underlying {@link PCollection}
+     */
+    default LCollection<S> cache() {
+        underlying().cache();
+        return this;
+    }
+
+    /**
+     * Cache the underlying {@link PCollection}
+     */
+    default LCollection<S> cache(CachingOptions options) {
+        underlying().cache(options);
+        return this;
+    }
+
+    /**
+     * Key this LCollection by a key extracted from the element to yield a {@link LTable} mapping the key to the whole
+     * element.
+     */
+    default <K> LTable<K, S> by(SFunction<S, K> extractFn, PType<K> pType) {
+        return parallelDo(
+                ctx -> ctx.emit(Pair.of(extractFn.apply(ctx.element()), ctx.element())),
+                ptf().tableOf(pType, pType()));
+    }
+
+    /**
+     * Count distict values in this LCollection, yielding an {@link LTable} mapping each value to the number
+     * of occurrences in the collection.
+     */
+    default LTable<S, Long> count() {
+        return map(a -> Pair.of(a, 1L), ptf().tableOf(pType(), ptf().longs()))
+                .groupByKey()
+                .combineValues(Aggregators.SUM_LONGS());
+    }
+
+    /**
+     * Obtain the contents of this LCollection as a {@link Stream} that can be processed locally. Note, this may trigger
+     * your job to execute in a distributed environment if the pipeline has not yet been run.
+     */
+    default Stream<S> materialize() {
+        return StreamSupport.stream(underlying().materialize().spliterator(), false);
+    }
+
+    /**
+     * Get the {@link PTypeFamily} representing how elements of this collection may be serialized.
+     */
+    default PTypeFamily ptf() {
+        return underlying().getPType().getFamily();
+    }
+
+    /**
+     * Get the {@link PType} representing how elements of this collection may be serialized.
+     */
+    default PType<S> pType() { return underlying().getPType(); }
+
+    /**
+     * Write this collection to the specified {@link Target}
+     */
+    default LCollection<S> write(Target target) {
+        underlying().write(target);
+        return this;
+    }
+
+    /**
+     * Write this collection to the specified {@link Target} with the given {@link org.apache.crunch.Target.WriteMode}
+     */
+    default LCollection<S> write(Target target, Target.WriteMode writeMode) {
+        underlying().write(target, writeMode);
+        return this;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactory.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactory.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactory.java
new file mode 100644
index 0000000..4b208d2
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactory.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+
+/**
+ * Factory for creating {@link LCollection}, {@link LTable} and {@link LGroupedTable} objects from their corresponding
+ * {@link PCollection}, {@link PTable} and {@link PGroupedTable} types. You probably don't want to use or implement this
+ * interface directly. You should start with the {@link Lambda} class instead.
+ */
+public interface LCollectionFactory {
+    /**
+     * Wrap a PCollection into an LCollection
+     */
+    <S> LCollection<S> wrap(PCollection<S> collection);
+
+    /**
+     * Wrap a PTable into an LTable
+     */
+    <K, V> LTable<K, V> wrap(PTable<K, V> collection);
+
+    /**
+     * Wrap a PGroupedTable into an LGroupedTable
+     */
+    <K, V> LGroupedTable<K, V> wrap(PGroupedTable<K, V> collection);
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactoryImpl.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactoryImpl.java
new file mode 100644
index 0000000..4bedfa7
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactoryImpl.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+
+class LCollectionFactoryImpl implements LCollectionFactory {
+
+    @Override
+    public <S> LCollection<S> wrap(final PCollection<S> collection) {
+        return new LCollection<S>() {
+            @Override
+            public PCollection<S> underlying() {
+                return collection;
+            }
+
+            @Override
+            public LCollectionFactory factory() {
+                return LCollectionFactoryImpl.this;
+            }
+        };
+    }
+
+    @Override
+    public <K, V> LTable<K, V> wrap(final PTable<K, V> collection) {
+        return new LTable<K, V>() {
+            @Override
+            public PTable<K, V> underlying() {
+                return collection;
+            }
+
+            @Override
+            public LCollectionFactory factory() {
+                return LCollectionFactoryImpl.this;
+            }
+        };
+    }
+
+    @Override
+    public <K, V> LGroupedTable<K, V> wrap(final PGroupedTable<K, V> collection) {
+        return new LGroupedTable<K, V>() {
+            @Override
+            public PGroupedTable<K, V> underlying() {
+                return collection;
+            }
+
+            @Override
+            public LCollectionFactory factory() {
+                return LCollectionFactoryImpl.this;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFn.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFn.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFn.java
new file mode 100644
index 0000000..1be8085
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFn.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.DoFn;
+
+import java.io.Serializable;
+
+/**
+ * A Java lambdas friendly version of the {@link DoFn} class.
+ */
+public interface LDoFn<S, T> extends Serializable {
+
+  void process(LDoFnContext<S, T> context);
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnContext.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnContext.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnContext.java
new file mode 100644
index 0000000..3743a2f
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnContext.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * Context object for implementing distributed operations in terms of Lambda expressions.
+ * @param <S> Input type of LDoFn
+ * @param <T> Output type of LDoFn
+ */
+public interface LDoFnContext<S, T> {
+    /** Get the input element */
+    S element();
+
+    /** Emit t to the output */
+    void emit(T t);
+
+    /** Get the underlying {@link TaskInputOutputContext} (for special cases) */
+    TaskInputOutputContext getContext();
+
+    /** Get the current Hadoop {@link Configuration} */
+    Configuration getConfiguration();
+
+    /** Increment a counter by 1 */
+    void increment(String groupName, String counterName);
+
+    /** Increment a counter by value */
+    void increment(String groupName, String counterName, long value);
+
+    /** Increment a counter by 1 */
+    void increment(Enum<?> counterName);
+
+    /** Increment a counter by value */
+    void increment(Enum<?> counterName, long value);
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnWrapper.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnWrapper.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnWrapper.java
new file mode 100644
index 0000000..76087d5
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnWrapper.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+class LDoFnWrapper<S, T> extends DoFn<S, T> {
+
+    private final LDoFn<S, T> fn;
+    private transient Context<S, T> ctxt;
+
+    public LDoFnWrapper(LDoFn<S, T> fn) {
+        this.fn = fn;
+    }
+
+    @Override
+    public void initialize() {
+        super.initialize();
+        if (getContext() == null) {
+            this.ctxt = new Context<>(getConfiguration());
+        } else {
+            this.ctxt = new Context<>(getContext());
+        }
+    }
+
+    @Override
+    public void process(S input, Emitter<T> emitter) {
+        fn.process(ctxt.update(input, emitter));
+    }
+    static class Context<S, T> implements LDoFnContext<S, T> {
+        private S element;
+        private Emitter<T> emitter;
+        private TaskInputOutputContext context;
+        private Configuration conf;
+
+        public Context(TaskInputOutputContext context) {
+            this.context = context;
+            this.conf = context.getConfiguration();
+        }
+
+        public Context(Configuration conf) {
+            this.context = null;
+            this.conf = conf;
+        }
+
+        public Context<S, T> update(S element, Emitter<T> emitter) {
+            this.element = element;
+            this.emitter = emitter;
+            return this;
+        }
+
+        public S element() {
+            return element;
+        }
+
+        public void emit(T t) {
+            emitter.emit(t);
+        }
+
+        public TaskInputOutputContext getContext() {
+            return context;
+        }
+
+        public Configuration getConfiguration() {
+            return conf;
+        }
+
+        public void increment(String groupName, String counterName) {
+            increment(groupName, counterName, 1);
+        }
+
+        public void increment(String groupName, String counterName, long value) {
+            if (context != null) {
+                context.getCounter(groupName, counterName).increment(value);
+            }
+        }
+
+        public void increment(Enum<?> counterName) {
+            increment(counterName, 1);
+        }
+
+        public void increment(Enum<?> counterName, long value) {
+            if (context != null) {
+                context.getCounter(counterName).increment(value);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LGroupedTable.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LGroupedTable.java
new file mode 100644
index 0000000..10209e0
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LGroupedTable.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.Aggregator;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.lambda.fn.SBiConsumer;
+import org.apache.crunch.lambda.fn.SBiFunction;
+import org.apache.crunch.lambda.fn.SBinaryOperator;
+import org.apache.crunch.lambda.fn.SFunction;
+import org.apache.crunch.lambda.fn.SSupplier;
+import org.apache.crunch.types.PType;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+/**
+ * Java 8 friendly version of the {@link PGroupedTable} interface, allowing distributed operations to be expressed in
+ * terms of lambda expressions and method references, instead of creating a new class implementation for each operation.
+ * @param <K> key type for this table
+ * @param <V> value type for this table
+ */
+public interface LGroupedTable<K, V> extends LCollection<Pair<K, Iterable<V>>>  {
+    /**
+     * Get the underlying {@link PGroupedTable} for this LGroupedTable
+     */
+    PGroupedTable<K, V> underlying();
+
+    /**
+     * Combine the value part of the table using the provided Crunch {@link Aggregator}. This will be optimised into
+     * both a combine and reduce in the MapReduce implementation, with similar optimisations available for other
+     * implementations.
+     */
+    default LTable<K, V> combineValues(Aggregator<V> aggregator) {
+        return factory().wrap(underlying().combineValues(aggregator));
+    }
+
+    /**
+     * Combine the value part of the table using the given functions. The supplier is used to create a new aggregating
+     * type, the combineFn adds a value into the aggregate, and the output function transforms the aggregate into
+     * an iterable of the original value type. For example, summation can be expressed as follows:
+     *
+     * <pre>{@code myGroupedTable.combineValues(() -> 0, (sum, value) -> sum + value, Collections::singleton) }</pre>
+     *
+     * <p>This will be optimised into both a combine and reduce in the MapReduce implementation, with similar
+     * optimizations *available for other implementations.</p>
+     */
+    default <A> LTable<K, V> combineValues(
+            SSupplier<A> initialSupplier,
+            SBiFunction<A, V, A> combineFn,
+            SFunction<A, Iterable<V>> outputFn) {
+        return combineValues(new LAggregator<>(initialSupplier, combineFn, outputFn));
+    }
+
+    /**
+     * Map the values in this LGroupedTable using a custom function. This function operates over a stream which can
+     * be consumed only once.
+     *
+     * <p>Note that in serialization systems which heavily reuse objects (such as Avro), you may
+     * in fact get given the same object multiple times with different data as you consume the stream, meaning it may
+     * be necessary to detach values.</p>
+     */
+    default <T> LTable<K, T> mapValues(SFunction<Stream<V>, T> fn, PType<T> pType) {
+        return parallelDo(
+                ctx -> ctx.emit(Pair.of(
+                        ctx.element().first(),
+                        fn.apply(StreamSupport.stream(ctx.element().second().spliterator(), false)))
+                ), ptf().tableOf(keyType(), pType));
+    }
+
+    /**
+     * Collect the values into an aggregate type. This differs from combineValues in that it outputs the aggregate type
+     * rather than the value type, and is designed to happen in one step (rather than being optimised into multiple
+     * levels). This makes it much more suitable for assembling collections than computing simple numeric aggregates.
+     *
+     * <p>The supplier provides an "empty" object, then the consumer is called with each value. For example, to collect
+     * all values into a {@link Collection}, one can do this:</p>
+     * <pre>{@code
+     * lgt.collectValues(ArrayList::new, Collection::add, lgt.ptf().collections(lgt.valueType()))
+     * }</pre>
+     *
+     * <p>This is in fact the default implementation for the collectAllValues() method.</p>
+     *
+     * <p>Note that in serialization systems which heavily reuse objects (such as Avro), you may
+     * in fact get given the same object multiple times with different data as you consume the stream, meaning it may
+     * be necessary to detach values.</p>
+     */
+    default <C> LTable<K, C> collectValues(SSupplier<C> emptySupplier, SBiConsumer<C, V> addFn, PType<C> pType) {
+        return parallelDo(ctx -> {
+            C coll = emptySupplier.get();
+            ctx.element().second().forEach(v -> addFn.accept(coll, v));
+            ctx.emit(Pair.of(ctx.element().first(), coll));
+        }, ptf().tableOf(keyType(), pType));
+    }
+
+    /**
+     * Collect all values for each key into a {@link Collection}
+     */
+    default LTable<K, Collection<V>> collectAllValues() {
+        return collectValues(ArrayList::new, Collection::add, ptf().collections(valueType()));
+    }
+
+    /**
+     * Collect all unique values for each key into a {@link Collection} (note that the value type must have a correctly-
+     * defined equals() and hashcode().
+     */
+    default LTable<K, Collection<V>> collectUniqueValues() {
+        return collectValues(HashSet::new, Collection::add, ptf().collections(valueType()));
+    }
+
+    /**
+     * Reduce the values for each key using the an associative binary operator.
+     * For example {@code reduceValues((a, b) -> a + b)} for summation, {@code reduceValues((a, b) -> a + ", " + b}
+     * for comma-separated string concatenation and {@code reduceValues((a, b) -> a > b ? a : b} for maximum value.
+     */
+    default LTable<K, V> reduceValues(SBinaryOperator<V> operator) {
+        return combineValues(() -> (V)null, (a, b) -> a == null ? b : operator.apply(a, b), Collections::singleton);
+    }
+
+    /**
+     * Ungroup this LGroupedTable back into an {@link LTable}. This will still trigger a "reduce" operation, so is
+     * usually only used in special cases like producing a globally-ordered list by feeding the everything through
+     * a single reducers.
+     */
+    default LTable<K, V> ungroup() {
+        return factory().wrap(underlying().ungroup());
+    }
+
+    /**
+     * Get a {@link PType} which can be used to serialize the key part of this grouped table
+     */
+    default PType<K> keyType() {
+        return underlying().getGroupedTableType().getTableType().getKeyType();
+    }
+
+    /**
+     * Get a {@link PType} which can be used to serialize the value part of this grouped table
+     */
+    default PType<V> valueType() {
+        return underlying().getGroupedTableType().getTableType().getValueType();
+    }
+
+}