You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by da...@apache.org on 2016/01/13 00:05:20 UTC
[1/2] crunch git commit: Java 8 lambda support for Apache Crunch.
Repository: crunch
Updated Branches:
refs/heads/master f8920d355 -> 7d7af4ef4
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java
new file mode 100644
index 0000000..0b4e4fa
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Target;
+import org.apache.crunch.lambda.fn.SFunction;
+import org.apache.crunch.lambda.fn.SPredicate;
+import org.apache.crunch.lib.join.DefaultJoinStrategy;
+import org.apache.crunch.lib.join.JoinStrategy;
+import org.apache.crunch.lib.join.JoinType;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+import java.util.Collection;
+
+/**
+ * Java 8 friendly version of the {@link PTable} interface, allowing distributed operations to be expressed in
+ * terms of lambda expressions and method references, instead of creating a new class implementation for each operation.
+ * @param <K> key type for this table
+ * @param <V> value type for this table
+ */
+public interface LTable<K, V> extends LCollection<Pair<K, V>> {
+ /**
+ * Get the underlying {@link PTable} for this LCollection
+ */
+ PTable<K, V> underlying();
+
+ /**
+ * Group this table by key to yield a {@link LGroupedTable}
+ */
+ default LGroupedTable<K, V> groupByKey() {
+ return factory().wrap(underlying().groupByKey());
+ }
+
+ /**
+ * Group this table by key to yield a {@link LGroupedTable}
+ */
+ default LGroupedTable<K, V> groupByKey(int numReducers) {
+ return factory().wrap(underlying().groupByKey(numReducers));
+ }
+
+ /**
+ * Group this table by key to yield a {@link LGroupedTable}
+ */
+ default LGroupedTable<K, V> groupByKey(GroupingOptions opts) {
+ return factory().wrap(underlying().groupByKey(opts));
+ }
+
+ /**
+ * Get an {@link LCollection} containing just the keys from this table
+ */
+ default LCollection<K> keys() {
+ return factory().wrap(underlying().keys());
+ }
+
+ /**
+ * Get an {@link LCollection} containing just the values from this table
+ */
+ default LCollection<V> values() {
+ return factory().wrap(underlying().values());
+ }
+
+ /**
+ * Transform the keys of this table using the given function
+ */
+ default <T> LTable<T, V> mapKeys(SFunction<K, T> fn, PType<T> pType) {
+ return parallelDo(
+ ctx -> ctx.emit(Pair.of(fn.apply(ctx.element().first()), ctx.element().second())),
+ ptf().tableOf(pType, valueType()));
+ }
+
+ /**
+ * Transform the values of this table using the given function
+ */
+ default <T> LTable<K, T> mapValues(SFunction<V, T> fn, PType<T> pType) {
+ return parallelDo(
+ ctx -> ctx.emit(Pair.of(ctx.element().first(), fn.apply(ctx.element().second()))),
+ ptf().tableOf(keyType(), pType));
+ }
+
+ /**
+ * Join this table to another {@link LTable} which has the same key type using the provided {@link JoinType} and
+ * {@link JoinStrategy}
+ */
+ default <U> LTable<K, Pair<V, U>> join(LTable<K, U> other, JoinType joinType, JoinStrategy<K, V, U> joinStrategy) {
+ return factory().wrap(joinStrategy.join(underlying(), other.underlying(), joinType));
+ }
+
+ /**
+ * Join this table to another {@link LTable} which has the same key type using the provide {@link JoinType} and
+ * the {@link DefaultJoinStrategy} (reduce-side join).
+ */
+ default <U> LTable<K, Pair<V, U>> join(LTable<K, U> other, JoinType joinType) {
+ return join(other, joinType, new DefaultJoinStrategy<>());
+ }
+
+ /**
+ * Inner join this table to another {@link LTable} which has the same key type using a reduce-side join
+ */
+ default <U> LTable<K, Pair<V, U>> join(LTable<K, U> other) {
+ return join(other, JoinType.INNER_JOIN);
+ }
+
+ /**
+ * Cogroup this table with another {@link LTable} with the same key type, collecting the set of values from
+ * each side.
+ */
+ default <U> LTable<K, Pair<Collection<V>, Collection<U>>> cogroup(LTable<K, U> other) {
+ return factory().wrap(underlying().cogroup(other.underlying()));
+ }
+
+ /**
+ * Get the underlying {@link PTableType} used to serialize key/value pairs in this table
+ */
+ default PTableType<K, V> pType() { return underlying().getPTableType(); }
+
+ /**
+ * Get a {@link PType} which can be used to serialize the key part of this table
+ */
+ default PType<K> keyType() {
+ return underlying().getKeyType();
+ }
+
+ /**
+ * Get a {@link PType} which can be used to serialize the value part of this table
+ */
+ default PType<V> valueType() {
+ return underlying().getValueType();
+ }
+
+ /**
+ * Write this table to the {@link Target} supplied.
+ */
+ default LTable<K, V> write(Target target) {
+ underlying().write(target);
+ return this;
+ }
+
+ /**
+ * Write this table to the {@link Target} supplied.
+ */
+ default LTable<K, V> write(Target target, Target.WriteMode writeMode) {
+ underlying().write(target, writeMode);
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ default LTable<K, V> increment(Enum<?> counter) {
+ return parallelDo(ctx -> ctx.increment(counter), pType());
+ }
+
+ /** {@inheritDoc} */
+ default LTable<K, V> increment(String counterGroup, String counterName) {
+ return parallelDo(ctx -> ctx.increment(counterGroup, counterName), pType());
+ }
+
+ /** {@inheritDoc} */
+ default LTable<K, V> incrementIf(Enum<?> counter, SPredicate<Pair<K, V>> condition) {
+ return parallelDo(ctx -> {
+ if (condition.test(ctx.element())) ctx.increment(counter);
+ }, pType());
+ }
+
+ /** {@inheritDoc} */
+ default LTable<K, V> incrementIf(String counterGroup, String counterName, SPredicate<Pair<K, V>> condition) {
+ return parallelDo(ctx -> {
+ if (condition.test(ctx.element())) ctx.increment(counterGroup, counterName);
+ }, pType());
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/Lambda.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/Lambda.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/Lambda.java
new file mode 100644
index 0000000..07fad2b
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/Lambda.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+
+/**
+ * Entry point for the crunch-lambda API. Use this to create {@link LCollection}, {@link LTable} and
+ * {@link LGroupedTable} objects from their corresponding {@link PCollection}, {@link PTable} and {@link PGroupedTable}
+ * types.
+ *
+ * <p>The crunch-lambda API allows you to write Crunch pipelines using lambda expressions and method references instead
+ * of creating classes (anonymous, inner, or top level) for each operation that needs to be completed. Many pipelines
+ * are composed of a large number of simple operations, rather than a small number of complex operations, making this
+ * strategy much more efficient to code and easy to read for those able to use Java 8 in their distributed computation
+ * environments.</p>
+ *
+ * <p>You use the API by wrapping your Crunch type into an L-type object. This class provides static methods for that.
+ * You can then use the lambda API methods on the L-type object, yielding more L-type objects. If at any point you need
+ * to go back to the standard Crunch world (for compatibility with existing code or complex use cases), you can at any
+ * time call underlying() on an L-type object to get a Crunch object</p>
+ *
+ * <p>Example (the obligatory wordcount):</p>
+ *
+ * <pre>{@code
+ * Pipeline pipeline = new MRPipeline(getClass());
+ * LCollection<String> inputText = Lambda.wrap(pipeline.readTextFile("/path/to/input/file"));
+ * inputText.flatMap(line -> Arrays.stream(line.split(" ")), Writables.strings())
+ * .count()
+ * .map(wordCountPair -> wordCountPair.first() + ": " + wordCountPair.second(), strings())
+ * .write(To.textFile("/path/to/output/file"));
+ * pipeline.run();
+ * }</pre>
+ *
+ */
+public class Lambda {
+ private static LCollectionFactory INSTANCE = new LCollectionFactoryImpl();
+
+ public static <S> LCollection<S> wrap(PCollection<S> collection) { return INSTANCE.wrap(collection); }
+ public static <K, V> LTable<K, V> wrap(PTable<K, V> collection) { return INSTANCE.wrap(collection); }
+ public static <K, V> LGroupedTable<K, V> wrap(PGroupedTable<K, V> collection) { return INSTANCE.wrap(collection); }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiConsumer.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiConsumer.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiConsumer.java
new file mode 100644
index 0000000..6e5030f
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiConsumer.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda.fn;
+
+import java.io.Serializable;
+import java.util.function.BiConsumer;
+
+/**
+ * Serializable version of the Java BiConsumer functional interface.
+ */
+@FunctionalInterface
+public interface SBiConsumer<K, V> extends BiConsumer<K, V>, Serializable {
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiFunction.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiFunction.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiFunction.java
new file mode 100644
index 0000000..5aac5bc
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiFunction.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda.fn;
+
+import java.io.Serializable;
+import java.util.function.BiFunction;
+
+/**
+ * Serializable version of the Java BiFunction functional interface.
+ */
+@FunctionalInterface
+public interface SBiFunction<K, V, T> extends BiFunction<K, V, T>, Serializable {
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBinaryOperator.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBinaryOperator.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBinaryOperator.java
new file mode 100644
index 0000000..d1e4cc0
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBinaryOperator.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda.fn;
+
+import java.io.Serializable;
+import java.util.function.BinaryOperator;
+
+/**
+ * Serializable version of the Java BinaryOperator functional interface.
+ */
+@FunctionalInterface
+public interface SBinaryOperator<T> extends BinaryOperator<T>, Serializable {
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SConsumer.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SConsumer.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SConsumer.java
new file mode 100644
index 0000000..90f4a99
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SConsumer.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda.fn;
+
+import java.io.Serializable;
+import java.util.function.Consumer;
+
+/**
+ * Serializable version of the Java Consumer functional interface.
+ */
+@FunctionalInterface
+public interface SConsumer<T> extends Consumer<T>, Serializable {
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SFunction.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SFunction.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SFunction.java
new file mode 100644
index 0000000..d120efe
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SFunction.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda.fn;
+
+import java.io.Serializable;
+import java.util.function.Function;
+
+/**
+ * Serializable version of the Java Function functional interface.
+ */
+@FunctionalInterface
+public interface SFunction<S, T> extends Function<S, T>, Serializable {
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SPredicate.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SPredicate.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SPredicate.java
new file mode 100644
index 0000000..9e90bab
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SPredicate.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda.fn;
+
+import java.io.Serializable;
+import java.util.function.Predicate;
+
+/**
+ * Serializable version of the Java Predicate functional interface.
+ */
+@FunctionalInterface
+public interface SPredicate<T> extends Predicate<T>, Serializable {
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SSupplier.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SSupplier.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SSupplier.java
new file mode 100644
index 0000000..eea254a
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SSupplier.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda.fn;
+
+import java.io.Serializable;
+import java.util.function.Supplier;
+
+/**
+ * Serializable version of the Java Supplier functional interface.
+ */
+@FunctionalInterface
+public interface SSupplier<T> extends Supplier<T>, Serializable {
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/package-info.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/package-info.java
new file mode 100644
index 0000000..ad18232
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Serializable versions of the functional interfaces that ship with Java 8
+ */
+package org.apache.crunch.lambda.fn;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/package-info.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/package-info.java
new file mode 100644
index 0000000..9c03148
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/package-info.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <p>Alternative Crunch API using Java 8 features to allow construction of pipelines using lambda functions and method
+ * references. It works by wrapping standards Java {@link org.apache.crunch.PCollection},
+ * {@link org.apache.crunch.PTable} and {@link org.apache.crunch.PGroupedTable} instances into the corresponding
+ * {@link org.apache.crunch.lambda.LCollection}, {@link org.apache.crunch.lambda.LTable} and
+ * {@link org.apache.crunch.lambda.LGroupedTable classes}.</p>
+ *
+ * <p>The static class {@link org.apache.crunch.lambda.Lambda} has methods to create these. Please also see the Javadocs
+ * for {@link org.apache.crunch.lambda.Lambda} for usage examples</p>
+ */
+package org.apache.crunch.lambda;
+
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/test/java/org/apache/crunch/lambda/LCollectionTest.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/test/java/org/apache/crunch/lambda/LCollectionTest.java b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LCollectionTest.java
new file mode 100644
index 0000000..b819d0d
--- /dev/null
+++ b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LCollectionTest.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.crunch.lambda.TestCommon.*;
+import static org.apache.crunch.lambda.TypedRecord.rec;
+import static org.apache.crunch.types.avro.Avros.*;
+import static org.junit.Assert.*;
+
+public class LCollectionTest {
+
+ private LCollection<TypedRecord> lc() {
+ return Lambda.wrap(MemPipeline.typedCollectionOf(Avros.reflects(TypedRecord.class),
+ rec(14, "Alice", 101L),
+ rec(25, "Bo B", 102L),
+ rec(21, "Char Lotte", 103L),
+ rec(28, "David", 104L),
+ rec(31, "Erik", 105L)));
+ }
+
+ @Test
+ public void testParallelDo() throws Exception {
+ LCollection<String> result = lc().parallelDo(ctx -> { if (ctx.element().key > 26) ctx.emit(ctx.element().name); }, strings());
+ assertCollectionOf(result, "David", "Erik");
+ }
+
+ @Test
+ public void testParallelDoPair() throws Exception {
+ LTable<Integer, String> result = lc().parallelDo(ctx -> {
+ if (ctx.element().key > 26) ctx.emit(Pair.of(ctx.element().key, ctx.element().name)); }, tableOf(ints(), strings()));
+ assertCollectionOf(result, Pair.of(28, "David"), Pair.of(31, "Erik"));
+ }
+
+
+ @Test
+ public void testMap() throws Exception {
+ assertCollectionOf(lc().map(r -> r.key, ints()), 14, 25, 21, 28, 31);
+ }
+
+ @Test
+ public void testMapPair() throws Exception {
+ assertCollectionOf(lc().map(r -> Pair.of(r.key, r.value), tableOf(ints(), longs())),
+ Pair.of(14, 101L),
+ Pair.of(25, 102L),
+ Pair.of(21, 103L),
+ Pair.of(28, 104L),
+ Pair.of(31, 105L));
+ }
+
+ @Test
+ public void testFlatMap() throws Exception {
+ assertCollectionOf(
+ lc().flatMap(s -> Arrays.stream(s.name.split(" ")), strings()),
+ "Alice", "Bo", "B", "Char", "Lotte", "David", "Erik");
+ }
+
+
+ @Test
+ public void testFilterMap() throws Exception {
+ Map<String, String> lookupMap = ImmutableMap.of("Erik", "BOOM", "Alice", "POW");
+ assertCollectionOf(
+ lc().filterMap(r -> lookupMap.containsKey(r.name) ? Optional.of(lookupMap.get(r.name)) : Optional.empty(), strings()),
+ "BOOM", "POW"
+ );
+ }
+
+ @Test
+ public void testFilter() throws Exception {
+ assertCollectionOf(lc().filter(r -> r.key == 21), rec(21, "Char Lotte", 103L));
+ }
+
+
+ @Test
+ public void testIncrement() throws Exception {
+ lc().increment("hello", "world");
+ long value = MemPipeline.getCounters().findCounter("hello", "world").getValue();
+ assertEquals(5L, value);
+ }
+
+ @Test
+ public void testIncrementIf() throws Exception {
+ lc().incrementIf("hello", "conditional_world", r -> r.key < 25);
+ long value = MemPipeline.getCounters().findCounter("hello", "conditional_world").getValue();
+ assertEquals(2L, value);
+ }
+
+ @Test
+ public void testBy() throws Exception {
+ assertCollectionOf(
+ lc().filter(r -> r.key == 21).by(r -> r.key, ints()),
+ Pair.of(21, rec(21, "Char Lotte", 103L)));
+ }
+
+ @Test
+ public void testCount() throws Exception {
+ assertCollectionOf(
+ Lambda.wrap(MemPipeline.typedCollectionOf(strings(), "a", "a", "a", "b", "b")).count(),
+ Pair.of("a", 3L),
+ Pair.of("b", 2L)
+ );
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/test/java/org/apache/crunch/lambda/LGroupedTableTest.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/test/java/org/apache/crunch/lambda/LGroupedTableTest.java b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LGroupedTableTest.java
new file mode 100644
index 0000000..043387e
--- /dev/null
+++ b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LGroupedTableTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.crunch.Pair;
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.crunch.lambda.TestCommon.assertCollectionOf;
+import static org.apache.crunch.types.avro.Avros.*;
+
+
+public class LGroupedTableTest {
+
+ LGroupedTable<String, Integer> lgt = Lambda.wrap(MemPipeline.typedTableOf(tableOf(strings(), ints()),
+ "a", 2,
+ "a", 3,
+ "b", 5,
+ "c", 7,
+ "c", 11,
+ "c", 13,
+ "c", 13))
+ .groupByKey();
+
+ @Test
+ public void testCombineValues() throws Exception {
+ assertCollectionOf(lgt.combineValues(Aggregators.MAX_INTS()),
+ Pair.of("a", 3),
+ Pair.of("b", 5),
+ Pair.of("c", 13));
+ }
+
+ @Test
+ public void testCombineValues1() throws Exception {
+ assertCollectionOf(lgt.combineValues(() -> Integer.MIN_VALUE, Integer::max, Collections::singleton),
+ Pair.of("a", 3),
+ Pair.of("b", 5),
+ Pair.of("c", 13));
+ }
+
+ @Test
+ public void testMapValues() throws Exception {
+ assertCollectionOf(lgt.mapValues(vs -> vs.map(i -> i.toString()).reduce((a, b) -> a + "," + b).get(), strings()),
+ Pair.of("a", "2,3"),
+ Pair.of("b", "5"),
+ Pair.of("c", "7,11,13,13"));
+ }
+
+ @Test
+ public void testCollectValues() throws Exception {
+ assertCollectionOf(lgt.collectValues(ArrayList::new, Collection::add, collections(ints())),
+ Pair.of("a", ImmutableList.of(2,3)),
+ Pair.of("b", ImmutableList.of(5)),
+ Pair.of("c", ImmutableList.of(7, 11, 13, 13)));
+ }
+
+ @Test
+ public void testCollectAllValues() throws Exception {
+ assertCollectionOf(lgt.collectAllValues(),
+ Pair.of("a", ImmutableList.of(2,3)),
+ Pair.of("b", ImmutableList.of(5)),
+ Pair.of("c", ImmutableList.of(7, 11, 13, 13)));
+ }
+
+ @Test
+ public void testCollectUniqueValues() throws Exception {
+ assertCollectionOf(lgt.collectUniqueValues(),
+ Pair.of("a", ImmutableSet.of(2, 3)),
+ Pair.of("b", ImmutableSet.of(5)),
+ Pair.of("c", ImmutableSet.of(7, 11, 13)));
+ }
+
+ @Test
+ public void testReduceValues() throws Exception {
+ assertCollectionOf(lgt.reduceValues((a, b) -> a * b),
+ Pair.of("a", 6),
+ Pair.of("b", 5),
+ Pair.of("c", 7 * 11 * 13 * 13)
+ );
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/test/java/org/apache/crunch/lambda/LTableTest.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/test/java/org/apache/crunch/lambda/LTableTest.java b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LTableTest.java
new file mode 100644
index 0000000..f66ada5
--- /dev/null
+++ b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LTableTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.junit.Test;
+
+import static org.apache.crunch.lambda.TestCommon.assertCollectionOf;
+import static org.apache.crunch.types.avro.Avros.*;
+
+
+public class LTableTest {
+
+ private LTable<String, Integer> lt1 = Lambda.wrap(MemPipeline.typedTableOf(tableOf(strings(), ints()),
+ "a", 2,
+ "a", 3,
+ "b", 5,
+ "c", 7,
+ "c", 11,
+ "c", 13,
+ "c", 13));
+
+ private LTable<String, Long> lt2 = Lambda.wrap(MemPipeline.typedTableOf(tableOf(strings(), longs()),
+ "a", 101L,
+ "b", 102L,
+ "c", 103L
+ ));
+
+ @Test
+ public void testKeys() throws Exception {
+ assertCollectionOf(lt1.keys(), "a", "a", "b", "c", "c", "c", "c");
+ }
+
+ @Test
+ public void testValues() throws Exception {
+ assertCollectionOf(lt2.values(), 101L, 102L, 103L);
+ }
+
+ @Test
+ public void testMapKeys() throws Exception {
+ assertCollectionOf(lt2.mapKeys(String::toUpperCase, strings()),
+ Pair.of("A", 101L),
+ Pair.of("B", 102L),
+ Pair.of("C", 103L)
+ );
+ }
+
+ @Test
+ public void testMapValues() throws Exception {
+ assertCollectionOf(lt2.mapValues(v -> v * 2, longs()),
+ Pair.of("a", 202L),
+ Pair.of("b", 204L),
+ Pair.of("c", 206L)
+ );
+ }
+
+ @Test
+ public void testJoin() throws Exception {
+ assertCollectionOf(lt1.join(lt2).values(),
+ Pair.of(2, 101L),
+ Pair.of(3, 101L),
+ Pair.of(5, 102L),
+ Pair.of(7, 103L),
+ Pair.of(11, 103L),
+ Pair.of(13, 103L),
+ Pair.of(13, 103L));
+ }
+
+ @Test
+ public void testCogroup() throws Exception {
+ assertCollectionOf(lt1.cogroup(lt2).values(),
+ Pair.of(ImmutableList.of(2, 3), ImmutableList.of(101L)),
+ Pair.of(ImmutableList.of(5), ImmutableList.of(102L)),
+ Pair.of(ImmutableList.of(7, 11, 13, 13), ImmutableList.of(103L))
+ );
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/test/java/org/apache/crunch/lambda/TestCommon.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/test/java/org/apache/crunch/lambda/TestCommon.java b/crunch-lambda/src/test/java/org/apache/crunch/lambda/TestCommon.java
new file mode 100644
index 0000000..02101ab
--- /dev/null
+++ b/crunch-lambda/src/test/java/org/apache/crunch/lambda/TestCommon.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import com.google.common.collect.Sets;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestCommon {
+ @SafeVarargs
+ public static <T> void assertCollectionOf(LCollection<T> actual, T... expected) {
+ Set<T> actualSet = actual.materialize().collect(Collectors.toSet());
+ Set<T> expectedSet = Sets.newHashSet(expected);
+ assertEquals(expectedSet, actualSet);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/test/java/org/apache/crunch/lambda/TypedRecord.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/test/java/org/apache/crunch/lambda/TypedRecord.java b/crunch-lambda/src/test/java/org/apache/crunch/lambda/TypedRecord.java
new file mode 100644
index 0000000..42540de
--- /dev/null
+++ b/crunch-lambda/src/test/java/org/apache/crunch/lambda/TypedRecord.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+public class TypedRecord {
+ public int key;
+ public String name;
+ public long value;
+ public static TypedRecord rec(int key, String name, long value) {
+ TypedRecord record = new TypedRecord();
+ record.key = key;
+ record.name = name;
+ record.value = value;
+ return record;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TypedRecord that = (TypedRecord) o;
+
+ if (key != that.key) return false;
+ if (value != that.value) return false;
+ return name != null ? name.equals(that.name) : that.name == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = key;
+ result = 31 * result + (name != null ? name.hashCode() : 0);
+ result = 31 * result + (int) (value ^ (value >>> 32));
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0758e43..b628a77 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,17 @@ under the License.
<module>crunch-hive</module>
<module>crunch-dist</module>
</modules>
+ <profiles>
+ <profile>
+ <id>java-8</id>
+ <activation>
+ <jdk>[1.8,]</jdk>
+ </activation>
+ <modules>
+ <module>crunch-lambda</module>
+ </modules>
+ </profile>
+ </profiles>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -163,6 +174,12 @@ under the License.
</dependency>
<dependency>
+ <groupId>org.apache.crunch</groupId>
+ <artifactId>crunch-lambda</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
[2/2] crunch git commit: Java 8 lambda support for Apache Crunch.
Posted by da...@apache.org.
Java 8 lambda support for Apache Crunch.
Remove lambda support from crunch-core, and instead implement a new module called crunch-lambda.
This will allow full use of Java 8 features in implementing support for lambda expressions and
method references, without requiring a dependency on Java 8 for crunch-core. Pthings are wrapped
into analagous Lthings which can be operated on with an API inspired both by the existing Crunch
API and the Java 8 streams API.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/7d7af4ef
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/7d7af4ef
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/7d7af4ef
Branch: refs/heads/master
Commit: 7d7af4ef43b122cc03cee721b9106d174d71d435
Parents: f8920d3
Author: David Whiting <da...@apache.org>
Authored: Sat Jan 2 11:28:34 2016 +0100
Committer: David Whiting <da...@apache.org>
Committed: Sun Jan 10 21:29:49 2016 +0100
----------------------------------------------------------------------
.../org/apache/crunch/MultiStagePlanningIT.java | 2 +-
.../it/java/org/apache/crunch/PageRankIT.java | 2 +-
.../it/java/org/apache/crunch/WordCountIT.java | 12 +-
.../src/main/java/org/apache/crunch/IDoFn.java | 49 ----
.../main/java/org/apache/crunch/IFilterFn.java | 27 --
.../main/java/org/apache/crunch/IFlatMapFn.java | 28 ---
.../src/main/java/org/apache/crunch/IMapFn.java | 27 --
.../java/org/apache/crunch/PCollection.java | 96 +-------
.../java/org/apache/crunch/PGroupedTable.java | 11 -
.../src/main/java/org/apache/crunch/PTable.java | 24 --
.../java/org/apache/crunch/fn/IFnHelpers.java | 149 -----------
.../impl/dist/collect/BaseGroupedTable.java | 9 -
.../impl/dist/collect/PCollectionImpl.java | 92 +------
.../crunch/impl/dist/collect/PTableBase.java | 22 --
.../crunch/impl/mem/collect/MemCollection.java | 89 -------
.../impl/mem/collect/MemGroupedTable.java | 7 -
.../crunch/impl/mem/collect/MemTable.java | 23 --
crunch-dist/pom.xml | 4 +
crunch-lambda/pom.xml | 67 +++++
.../org/apache/crunch/lambda/LAggregator.java | 57 +++++
.../org/apache/crunch/lambda/LCollection.java | 244 +++++++++++++++++++
.../crunch/lambda/LCollectionFactory.java | 44 ++++
.../crunch/lambda/LCollectionFactoryImpl.java | 70 ++++++
.../java/org/apache/crunch/lambda/LDoFn.java | 31 +++
.../org/apache/crunch/lambda/LDoFnContext.java | 52 ++++
.../org/apache/crunch/lambda/LDoFnWrapper.java | 106 ++++++++
.../org/apache/crunch/lambda/LGroupedTable.java | 162 ++++++++++++
.../java/org/apache/crunch/lambda/LTable.java | 188 ++++++++++++++
.../java/org/apache/crunch/lambda/Lambda.java | 59 +++++
.../apache/crunch/lambda/fn/SBiConsumer.java | 28 +++
.../apache/crunch/lambda/fn/SBiFunction.java | 28 +++
.../crunch/lambda/fn/SBinaryOperator.java | 28 +++
.../org/apache/crunch/lambda/fn/SConsumer.java | 28 +++
.../org/apache/crunch/lambda/fn/SFunction.java | 28 +++
.../org/apache/crunch/lambda/fn/SPredicate.java | 28 +++
.../org/apache/crunch/lambda/fn/SSupplier.java | 28 +++
.../apache/crunch/lambda/fn/package-info.java | 22 ++
.../org/apache/crunch/lambda/package-info.java | 30 +++
.../apache/crunch/lambda/LCollectionTest.java | 128 ++++++++++
.../apache/crunch/lambda/LGroupedTableTest.java | 103 ++++++++
.../org/apache/crunch/lambda/LTableTest.java | 94 +++++++
.../org/apache/crunch/lambda/TestCommon.java | 34 +++
.../org/apache/crunch/lambda/TypedRecord.java | 52 ++++
pom.xml | 17 ++
44 files changed, 1771 insertions(+), 658 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java b/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java
index 38211a7..a7b7d48 100644
--- a/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java
@@ -60,7 +60,7 @@ public class MultiStagePlanningIT implements Serializable {
PTable<String, String> addressesTable = pipeline.readTextFile(addressesFile)
.parallelDo("Split addresses", new StringToPairMapFn(), tableOf(strings(), strings()))
- .filter(new IFilterFn<Pair<String, String>>() {
+ .filter(new FilterFn<Pair<String, String>>() {
@Override
public boolean accept(Pair<String, String> input) {
// This is odd but it is the simpler way of simulating this would take longer than
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
index b30465d..701f78a 100644
--- a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
@@ -130,7 +130,7 @@ public class PageRankIT {
}, ptf.tableOf(ptf.strings(), ptf.floats()));
return input.cogroup(outbound).mapValues(
- new IMapFn<Pair<Collection<PageRankData>, Collection<Float>>, PageRankData>() {
+ new MapFn<Pair<Collection<PageRankData>, Collection<Float>>, PageRankData>() {
@Override
public PageRankData map(Pair<Collection<PageRankData>, Collection<Float>> input) {
PageRankData prd = Iterables.getOnlyElement(input.first());
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java b/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
index 4c77c41..e0bd719 100644
--- a/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
@@ -39,6 +39,7 @@ import org.apache.crunch.types.PTypes;
import org.apache.crunch.types.avro.AvroTypeFamily;
import org.apache.crunch.types.writable.WritableTypeFamily;
import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
import org.junit.Rule;
import org.junit.Test;
@@ -55,17 +56,18 @@ public class WordCountIT {
}
public static PTable<String, Long> wordCount(PCollection<String> words, PTypeFamily typeFamily) {
- return Aggregate.count(words.parallelDo(new IDoFn<String, String>() {
+ return Aggregate.count(words.parallelDo(new DoFn<String, String>() {
@Override
- public void process(Context<String, String> context) {
- List<String> words = Arrays.asList(context.element().split("\\s+"));
+ public void process(String input, Emitter<String> emitter) {
+ List<String> words = Arrays.asList(input.split("\\s+"));
for (String word : words) {
if ("and".equals(word)) {
- context.increment(WordCountStats.ANDS);
+ increment(WordCountStats.ANDS);
}
- context.emit(word);
+ emitter.emit(word);
}
}
+
}, typeFamily.strings()));
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/IDoFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/IDoFn.java b/crunch-core/src/main/java/org/apache/crunch/IDoFn.java
deleted file mode 100644
index b393f43..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/IDoFn.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-import java.io.Serializable;
-
-/**
- * A Java lambdas friendly version of the {@link DoFn} class.
- */
-public interface IDoFn<S, T> extends Serializable {
-
- void process(Context<S, T> context);
-
- public interface Context<S, T> {
- S element();
-
- void emit(T t);
-
- TaskInputOutputContext getContext();
-
- Configuration getConfiguration();
-
- void increment(String groupName, String counterName);
-
- void increment(String groupName, String counterName, long value);
-
- void increment(Enum<?> counterName);
-
- void increment(Enum<?> counterName, long value);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java b/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java
deleted file mode 100644
index bb8a03d..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.io.Serializable;
-
-/**
- * A Java lambdas friendly version of the {@link org.apache.crunch.FilterFn} class.
- */
-public interface IFilterFn<S> extends Serializable {
- boolean accept(S input);
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java b/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java
deleted file mode 100644
index a2b85c4..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.io.Serializable;
-
-/**
- * A Java lambdas friendly interface for writing business logic against {@code PCollection}s
- * that take in a single input record and return 0 to N output records via an {@code Iterable}.
- */
-public interface IFlatMapFn<S, T> extends Serializable {
- Iterable<T> process(S input);
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/IMapFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/IMapFn.java b/crunch-core/src/main/java/org/apache/crunch/IMapFn.java
deleted file mode 100644
index 3c06d9e..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/IMapFn.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.io.Serializable;
-
-/**
- * A Java lambdas friendly version of the {@link org.apache.crunch.MapFn} class.
- */
-public interface IMapFn<S, T> extends Serializable {
- public T map(S input);
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/PCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PCollection.java b/crunch-core/src/main/java/org/apache/crunch/PCollection.java
index 5d072e6..8043349 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PCollection.java
@@ -132,95 +132,6 @@ public interface PCollection<S> {
<K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type,
ParallelDoOptions options);
- /**
- * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
- */
- <T> PCollection<T> parallelDo(IDoFn<S, T> fn, PType<T> type);
-
- /**
- * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
- */
- <K, V> PTable<K, V> parallelDo(IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type);
-
- /**
- * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
- */
- <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type);
-
- /**
- * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
- */
- <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type);
-
- /**
- * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
- */
- <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type, ParallelDoOptions options);
-
- /**
- * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
- */
- <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type, ParallelDoOptions options);
-
- /**
- * For each element of this {@code PCollection}, generate 0 to N output values using the
- * given {@code IFlatMapFn}. Designed for Java lambdas.
- */
- <T> PCollection<T> flatMap(IFlatMapFn<S, T> fn, PType<T> type);
-
- /**
- * For each element of this {@code PCollection}, generate 0 to N output values using the
- * given {@code IFlatMapFn}. Designed for Java lambdas.
- */
- <K, V> PTable<K, V> flatMap(IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type);
-
- /**
- * For each element of this {@code PCollection}, generate 0 to N output values using the
- * given {@code IFlatMapFn}. Designed for Java lambdas.
- */
- <T> PCollection<T> flatMap(String name, IFlatMapFn<S, T> fn, PType<T> type);
-
- /**
- * For each element of this {@code PCollection}, generate 0 to N output values using the
- * given {@code IFlatMapFn}. Designed for Java lambdas.
- */
- <K, V> PTable<K, V> flatMap(String name, IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type);
-
- /**
- * For each element of this {@code PCollection}, generate one output value using the
- * given {@code IMapFn}. Designed for Java lambdas.
- */
- <T> PCollection<T> map(IMapFn<S, T> fn, PType<T> type);
-
- /**
- * For each element of this {@code PCollection}, generate one output value using the
- * given {@code IMapFn}. Designed for Java lambdas.
- */
- <K, V> PTable<K, V> map(IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type);
-
- /**
- * For each element of this {@code PCollection}, generate one output value using the
- * given {@code IMapFn}. Designed for Java lambdas.
- */
- <T> PCollection<T> map(String name, IMapFn<S, T> fn, PType<T> type);
-
- /**
- * For each element of this {@code PCollection}, generate one output value using the
- * given {@code IMapFn}. Designed for Java lambdas.
- */
- <K, V> PTable<K, V> map(String name, IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type);
-
- /**
- * Filter elements of this {@code PCollection} using the given {@code IFilterFn}.
- * Designed for Java lambdas.
- */
- PCollection<S> filter(IFilterFn<S> fn);
-
- /**
- * Filter elements of this {@code PCollection} using the given {@code IFilterFn}.
- * Designed for Java lambdas.
- */
- PCollection<S> filter(String name, IFilterFn<S> fn);
/**
* Write the contents of this {@code PCollection} to the given {@code Target},
@@ -349,12 +260,6 @@ public interface PCollection<S> {
<K> PTable<K, S> by(MapFn<S, K> extractKeyFn, PType<K> keyType);
/**
- * Apply the given {@code IMapFn} to each element of this instance in order to
- * create a {@code PTable}. Designed for use with Java 8 lambdas.
- */
- <K> PTable<K, S> by(IMapFn<S, K> extractKeyFn, PType<K> keyType);
-
- /**
* Apply the given map function to each element of this instance in order to
* create a {@code PTable}.
*
@@ -385,4 +290,5 @@ public interface PCollection<S> {
* Returns a {@code PCollection} that contains the result of aggregating all values in this instance.
*/
PCollection<S> aggregate(Aggregator<S> aggregator);
+
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
index 6ac86de..756855c 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
@@ -79,17 +79,6 @@ public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> {
*/
<U> PTable<K, U> mapValues(MapFn<Iterable<V>, U> mapFn, PType<U> ptype);
-
- /**
- * Maps the {@code Iterable<V>} elements of each record to a new type. Just like
- * any {@code parallelDo} operation on a {@code PGroupedTable}, this may only be
- * called once. Designed for Java lambdas
- * @param mapFn The mapping function (can be lambda/method ref)
- * @param ptype The serialization infromation for the returned data
- * @return A new {@code PTable} instance
- */
- <U> PTable<K, U> mapValues(IMapFn<Iterable<V>, U> mapFn, PType<U> ptype);
-
/**
* Maps the {@code Iterable<V>} elements of each record to a new type. Just like
* any {@code parallelDo} operation on a {@code PGroupedTable}, this may only be
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/PTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PTable.java b/crunch-core/src/main/java/org/apache/crunch/PTable.java
index 5609c3f..74cade8 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PTable.java
@@ -107,12 +107,6 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> {
/**
* Returns a {@code PTable} that has the same keys as this instance, but
- * uses the given function to map the values. Designed for Java lambdas.
- */
- <U> PTable<K, U> mapValues(IMapFn<V, U> mapFn, PType<U> ptype);
-
- /**
- * Returns a {@code PTable} that has the same keys as this instance, but
* uses the given function to map the values.
*/
<U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype);
@@ -125,12 +119,6 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> {
/**
* Returns a {@code PTable} that has the same values as this instance, but
- * uses the given function to map the keys. Designed for Java lambdas.
- */
- <K2> PTable<K2, V> mapKeys(IMapFn<K, K2> mapFn, PType<K2> ptype);
-
- /**
- * Returns a {@code PTable} that has the same values as this instance, but
* uses the given function to map the keys.
*/
<K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype);
@@ -149,12 +137,6 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> {
/**
* Apply the given filter function to this instance and return the resulting
- * {@code PTable}. Designed for Java lambdas.
- */
- PTable<K, V> filter(IFilterFn<Pair<K, V>> fn);
-
- /**
- * Apply the given filter function to this instance and return the resulting
* {@code PTable}.
*
* @param name
@@ -165,12 +147,6 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> {
PTable<K, V> filter(String name, FilterFn<Pair<K, V>> filterFn);
/**
- * Apply the given filter function to this instance and return the resulting
- * {@code PTable}. Designed for Java lambdas.
- */
- PTable<K, V> filter(String name, IFilterFn<Pair<K, V>> fn);
-
- /**
* Returns a PTable made up of the pairs in this PTable with the largest value
* field.
*
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java b/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java
deleted file mode 100644
index 8560fab..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.FilterFn;
-import org.apache.crunch.IDoFn;
-import org.apache.crunch.IFilterFn;
-import org.apache.crunch.IFlatMapFn;
-import org.apache.crunch.IMapFn;
-import org.apache.crunch.MapFn;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-public class IFnHelpers {
-
- public static <S, T> DoFn<S, T> wrap(final org.apache.crunch.IDoFn<S, T> fn) {
- return new IDoFnWrapper(fn);
- }
-
- public static <S, T> DoFn<S, T> wrapFlatMap(final IFlatMapFn<S, T> fn) {
- return new DoFn<S, T>() {
- @Override
- public void process(S input, Emitter<T> emitter) {
- for (T t : fn.process(input)) {
- emitter.emit(t);
- }
- }
- };
- }
-
- public static <S, T> MapFn<S, T> wrapMap(final IMapFn<S, T> fn) {
- return new MapFn<S, T>() {
- @Override
- public T map(S input) {
- return fn.map(input);
- }
- };
- }
-
- public static <S> FilterFn<S> wrapFilter(final IFilterFn<S> fn) {
- return new FilterFn<S>() {
- @Override
- public boolean accept(S input) {
- return fn.accept(input);
- }
- };
- }
-
- static class IDoFnWrapper<S, T> extends DoFn<S, T> {
-
- private final org.apache.crunch.IDoFn<S, T> fn;
- private transient ContextImpl<S, T> ctxt;
-
- public IDoFnWrapper(org.apache.crunch.IDoFn<S, T> fn) {
- this.fn = fn;
- }
-
- @Override
- public void initialize() {
- super.initialize();
- if (getContext() == null) {
- this.ctxt = new ContextImpl<S, T>(getConfiguration());
- } else {
- this.ctxt = new ContextImpl<S, T>(getContext());
- }
- }
-
- @Override
- public void process(S input, Emitter<T> emitter) {
- fn.process(ctxt.update(input, emitter));
- }
- }
-
- static class ContextImpl<S, T> implements IDoFn.Context<S, T> {
- private S element;
- private Emitter<T> emitter;
- private TaskInputOutputContext context;
- private Configuration conf;
-
- public ContextImpl(TaskInputOutputContext context) {
- this.context = context;
- this.conf = context.getConfiguration();
- }
-
- public ContextImpl(Configuration conf) {
- this.context = null;
- this.conf = conf;
- }
-
- public ContextImpl update(S element, Emitter<T> emitter) {
- this.element = element;
- this.emitter = emitter;
- return this;
- }
-
- public S element() {
- return element;
- }
-
- public void emit(T t) {
- emitter.emit(t);
- }
-
- public TaskInputOutputContext getContext() {
- return context;
- }
-
- public Configuration getConfiguration() {
- return conf;
- }
-
- public void increment(String groupName, String counterName) {
- increment(groupName, counterName, 1);
- }
-
- public void increment(String groupName, String counterName, long value) {
- if (context != null) {
- context.getCounter(groupName, counterName).increment(value);
- }
- }
-
- public void increment(Enum<?> counterName) {
- increment(counterName, 1);
- }
-
- public void increment(Enum<?> counterName, long value) {
- if (context != null) {
- context.getCounter(counterName).increment(value);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
index eb2d829..7bfacdf 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
@@ -25,16 +25,13 @@ import org.apache.crunch.CombineFn;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.IMapFn;
import org.apache.crunch.MapFn;
import org.apache.crunch.PGroupedTable;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.ReadableData;
-import org.apache.crunch.SourceTarget;
import org.apache.crunch.Target;
import org.apache.crunch.fn.Aggregators;
-import org.apache.crunch.fn.IFnHelpers;
import org.apache.crunch.lib.PTables;
import org.apache.crunch.types.PGroupedTableType;
import org.apache.crunch.types.PType;
@@ -121,12 +118,6 @@ public class BaseGroupedTable<K, V> extends PCollectionImpl<Pair<K, Iterable<V>>
}
@Override
- public <U> PTable<K, U> mapValues(IMapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
- return PTables.mapValues(this, IFnHelpers.wrapMap(mapFn), ptype);
- }
-
-
- @Override
public <U> PTable<K, U> mapValues(String name, MapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
return PTables.mapValues(name, this, mapFn, ptype);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
index 2a5e1f5..7650ff5 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
@@ -25,10 +25,6 @@ import org.apache.crunch.CachingOptions;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.DoFn;
import org.apache.crunch.FilterFn;
-import org.apache.crunch.IFilterFn;
-import org.apache.crunch.IFlatMapFn;
-import org.apache.crunch.IDoFn;
-import org.apache.crunch.IMapFn;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PObject;
@@ -40,7 +36,6 @@ import org.apache.crunch.ReadableData;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.Target;
import org.apache.crunch.fn.ExtractKeyFn;
-import org.apache.crunch.fn.IFnHelpers;
import org.apache.crunch.fn.IdentityFn;
import org.apache.crunch.impl.dist.DistributedPipeline;
import org.apache.crunch.io.ReadableSource;
@@ -145,7 +140,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
}
@Override
- public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type) {
+ public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type) {
return parallelDo(name, fn, type, ParallelDoOptions.builder().build());
}
@@ -171,86 +166,6 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
return pipeline.getFactory().createDoTable(name, getChainingCollection(), fn, type, options);
}
- @Override
- public <T> PCollection<T> parallelDo(IDoFn<S, T> fn, PType<T> type) {
- return parallelDo(IFnHelpers.wrap(fn), type);
- }
-
- @Override
- public <K, V> PTable<K, V> parallelDo(IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
- return parallelDo(IFnHelpers.wrap(fn), type);
- }
-
- @Override
- public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type) {
- return parallelDo(name, IFnHelpers.wrap(fn), type);
- }
-
- @Override
- public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
- return parallelDo(name, IFnHelpers.wrap(fn), type);
- }
-
- @Override
- public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type, ParallelDoOptions options) {
- return parallelDo(name, IFnHelpers.wrap(fn), type, options);
- }
-
- @Override
- public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type, ParallelDoOptions options) {
- return parallelDo(name, IFnHelpers.wrap(fn), type, options);
- }
-
- @Override
- public <T> PCollection<T> flatMap(IFlatMapFn<S, T> fn, PType<T> type) {
- return parallelDo(IFnHelpers.wrapFlatMap(fn), type);
- }
-
- @Override
- public <K, V> PTable<K, V> flatMap(IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
- return parallelDo(IFnHelpers.wrapFlatMap(fn), type);
- }
-
- @Override
- public <T> PCollection<T> flatMap(String name, IFlatMapFn<S, T> fn, PType<T> type) {
- return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type);
- }
-
- @Override
- public <K, V> PTable<K, V> flatMap(String name, IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
- return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type);
- }
-
- @Override
- public <T> PCollection<T> map(IMapFn<S, T> fn, PType<T> type) {
- return parallelDo(IFnHelpers.wrapMap(fn), type);
- }
-
- @Override
- public <K, V> PTable<K, V> map(IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
- return parallelDo(IFnHelpers.wrapMap(fn), type);
- }
-
- @Override
- public <T> PCollection<T> map(String name, IMapFn<S, T> fn, PType<T> type) {
- return parallelDo(name, IFnHelpers.wrapMap(fn), type);
- }
-
- @Override
- public <K, V> PTable<K, V> map(String name, IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
- return parallelDo(name, IFnHelpers.wrapMap(fn), type);
- }
-
- @Override
- public PCollection<S> filter(IFilterFn<S> fn) {
- return filter(IFnHelpers.wrapFilter(fn));
- }
-
- @Override
- public PCollection<S> filter(String name, IFilterFn<S> fn) {
- return filter(name, IFnHelpers.wrapFilter(fn));
- }
-
public PCollection<S> write(Target target) {
if (materializedAt != null) {
getPipeline().write(
@@ -355,11 +270,6 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
}
@Override
- public <K> PTable<K, S> by(IMapFn<S, K> mapFn, PType<K> keyType) {
- return parallelDo(new ExtractKeyFn<K, S>(IFnHelpers.wrapMap(mapFn)), getTypeFamily().tableOf(keyType, getPType()));
- }
-
- @Override
public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K> keyType) {
return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
index 6bc3a41..4ba4d49 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
@@ -21,8 +21,6 @@ import com.google.common.collect.Lists;
import org.apache.crunch.CachingOptions;
import org.apache.crunch.FilterFn;
import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.IFilterFn;
-import org.apache.crunch.IMapFn;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PObject;
@@ -31,7 +29,6 @@ import org.apache.crunch.Pair;
import org.apache.crunch.ParallelDoOptions;
import org.apache.crunch.TableSource;
import org.apache.crunch.Target;
-import org.apache.crunch.fn.IFnHelpers;
import org.apache.crunch.impl.dist.DistributedPipeline;
import org.apache.crunch.lib.Aggregate;
import org.apache.crunch.lib.Cogroup;
@@ -135,24 +132,10 @@ public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> imple
}
@Override
- public PTable<K, V> filter(IFilterFn<Pair<K, V>> filterFn) {
- return parallelDo(IFnHelpers.wrapFilter(filterFn), getPTableType());
- }
-
- @Override
- public PTable<K, V> filter(String name, IFilterFn<Pair<K, V>> filterFn) {
- return parallelDo(name, IFnHelpers.wrapFilter(filterFn), getPTableType());
- }
-
- @Override
public <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype) {
return PTables.mapValues(this, mapFn, ptype);
}
- @Override
- public <U> PTable<K, U> mapValues(IMapFn<V, U> mapFn, PType<U> ptype) {
- return PTables.mapValues(this, IFnHelpers.wrapMap(mapFn), ptype);
- }
@Override
public <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype) {
@@ -165,11 +148,6 @@ public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> imple
}
@Override
- public <K2> PTable<K2, V> mapKeys(IMapFn<K, K2> mapFn, PType<K2> ptype) {
- return PTables.mapKeys(this, IFnHelpers.wrapMap(mapFn), ptype);
- }
-
- @Override
public <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype) {
return PTables.mapKeys(name, this, mapFn, ptype);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index 89671a3..087a31d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -21,7 +21,6 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
-import java.io.ObjectStreamClass;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Set;
@@ -36,10 +35,6 @@ import org.apache.crunch.Aggregator;
import org.apache.crunch.CachingOptions;
import org.apache.crunch.DoFn;
import org.apache.crunch.FilterFn;
-import org.apache.crunch.IFilterFn;
-import org.apache.crunch.IFlatMapFn;
-import org.apache.crunch.IDoFn;
-import org.apache.crunch.IMapFn;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PObject;
@@ -51,7 +46,6 @@ import org.apache.crunch.ReadableData;
import org.apache.crunch.PipelineCallable;
import org.apache.crunch.Target;
import org.apache.crunch.fn.ExtractKeyFn;
-import org.apache.crunch.fn.IFnHelpers;
import org.apache.crunch.impl.mem.MemPipeline;
import org.apache.crunch.impl.mem.emit.InMemoryEmitter;
import org.apache.crunch.lib.Aggregate;
@@ -205,84 +199,6 @@ public class MemCollection<S> implements PCollection<S> {
}
@Override
- public <T> PCollection<T> parallelDo(IDoFn<S, T> fn, PType<T> type) {
- return parallelDo(null, fn, type);
- }
-
- @Override
- public <K, V> PTable<K, V> parallelDo(IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
- return parallelDo(null, fn, type);
- }
-
- @Override
- public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type) {
- return parallelDo(name, fn, type, null);
- }
-
- @Override
- public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
- return parallelDo(name, fn, type, null);
- }
-
- @Override
- public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type, ParallelDoOptions options) {
- return parallelDo(name, IFnHelpers.wrap(fn), type, options);
- }
-
- @Override
- public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type, ParallelDoOptions options) {
- return parallelDo(name, IFnHelpers.wrap(fn), type, options);
- }
-
- @Override
- public <T> PCollection<T> flatMap(IFlatMapFn<S, T> fn, PType<T> type) {
- return parallelDo(IFnHelpers.wrapFlatMap(fn), type);
- }
-
- @Override
- public <K, V> PTable<K, V> flatMap(IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
- return parallelDo(IFnHelpers.wrapFlatMap(fn), type);
- }
-
- @Override
- public <T> PCollection<T> flatMap(String name, IFlatMapFn<S, T> fn, PType<T> type) {
- return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type);
- }
-
- @Override
- public <K, V> PTable<K, V> flatMap(String name, IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
- return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type);
- }
-
- @Override
- public <T> PCollection<T> map(IMapFn<S, T> fn, PType<T> type) {
- return parallelDo(IFnHelpers.wrapMap(fn), type);
- }
-
- @Override
- public <K, V> PTable<K, V> map(IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
- return parallelDo(IFnHelpers.wrapMap(fn), type);
- }
-
- @Override
- public <T> PCollection<T> map(String name, IMapFn<S, T> fn, PType<T> type) {
- return parallelDo(name, IFnHelpers.wrapMap(fn), type);
- }
-
- @Override
- public <K, V> PTable<K, V> map(String name, IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
- return parallelDo(name, IFnHelpers.wrapMap(fn), type);
- }
-
- public PCollection<S> filter(IFilterFn<S> fn) {
- return filter(IFnHelpers.wrapFilter(fn));
- }
-
- public PCollection<S> filter(String name, IFilterFn<S> fn) {
- return filter(name, IFnHelpers.wrapFilter(fn));
- }
-
- @Override
public PCollection<S> write(Target target) {
getPipeline().write(this, target);
return this;
@@ -404,11 +320,6 @@ public class MemCollection<S> implements PCollection<S> {
}
@Override
- public <K> PTable<K, S> by(IMapFn<S, K> mapFn, PType<K> keyType) {
- return parallelDo(new ExtractKeyFn<K, S>(IFnHelpers.wrapMap(mapFn)), getTypeFamily().tableOf(keyType, getPType()));
- }
-
- @Override
public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K> keyType) {
return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
index 5451533..e8bf5e6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
@@ -22,7 +22,6 @@ import org.apache.crunch.CombineFn;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.IMapFn;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PGroupedTable;
@@ -30,7 +29,6 @@ import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Target;
import org.apache.crunch.fn.Aggregators;
-import org.apache.crunch.fn.IFnHelpers;
import org.apache.crunch.lib.PTables;
import org.apache.crunch.types.PGroupedTableType;
import org.apache.crunch.types.PTableType;
@@ -125,11 +123,6 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implemen
}
@Override
- public <U> PTable<K, U> mapValues(IMapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
- return PTables.mapValues(this, IFnHelpers.wrapMap(mapFn), ptype);
- }
-
- @Override
public <U> PTable<K, U> mapValues(String name, MapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
return PTables.mapValues(name, this, mapFn, ptype);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
index 03b5a70..b90b656 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
@@ -24,8 +24,6 @@ import com.google.common.collect.ImmutableList;
import org.apache.crunch.CachingOptions;
import org.apache.crunch.FilterFn;
import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.IFilterFn;
-import org.apache.crunch.IMapFn;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PGroupedTable;
@@ -33,7 +31,6 @@ import org.apache.crunch.PObject;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Target;
-import org.apache.crunch.fn.IFnHelpers;
import org.apache.crunch.lib.Aggregate;
import org.apache.crunch.lib.Cogroup;
import org.apache.crunch.lib.Join;
@@ -138,26 +135,11 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable<
}
@Override
- public PTable<K, V> filter(IFilterFn<Pair<K, V>> filterFn) {
- return parallelDo(IFnHelpers.wrapFilter(filterFn), getPTableType());
- }
-
- @Override
- public PTable<K, V> filter(String name, IFilterFn<Pair<K, V>> filterFn) {
- return parallelDo(name, IFnHelpers.wrapFilter(filterFn), getPTableType());
- }
-
- @Override
public <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype) {
return PTables.mapValues(this, mapFn, ptype);
}
@Override
- public <U> PTable<K, U> mapValues(IMapFn<V, U> mapFn, PType<U> ptype) {
- return PTables.mapValues(this, IFnHelpers.wrapMap(mapFn), ptype);
- }
-
- @Override
public <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype) {
return PTables.mapValues(name, this, mapFn, ptype);
}
@@ -168,11 +150,6 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable<
}
@Override
- public <K2> PTable<K2, V> mapKeys(IMapFn<K, K2> mapFn, PType<K2> ptype) {
- return PTables.mapKeys(this, IFnHelpers.wrapMap(mapFn), ptype);
- }
-
- @Override
public <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype) {
return PTables.mapKeys(name, this, mapFn, ptype);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-dist/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-dist/pom.xml b/crunch-dist/pom.xml
index b7cd0e9..48d9b05 100644
--- a/crunch-dist/pom.xml
+++ b/crunch-dist/pom.xml
@@ -61,6 +61,10 @@ under the License.
<groupId>org.apache.crunch</groupId>
<artifactId>crunch-contrib</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.crunch</groupId>
+ <artifactId>crunch-lambda</artifactId>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-lambda/pom.xml b/crunch-lambda/pom.xml
new file mode 100644
index 0000000..e910517
--- /dev/null
+++ b/crunch-lambda/pom.xml
@@ -0,0 +1,67 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.crunch</groupId>
+ <artifactId>crunch-parent</artifactId>
+ <version>0.14.0-SNAPSHOT</version>
+ </parent>
+ <properties>
+ <java.source.version>1.8</java.source.version>
+ <java.target.version>1.8</java.target.version>
+ </properties>
+
+ <artifactId>crunch-lambda</artifactId>
+ <name>Apache Crunch Lambda</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.crunch</groupId>
+ <artifactId>crunch-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.crunch</groupId>
+ <artifactId>crunch-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LAggregator.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LAggregator.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LAggregator.java
new file mode 100644
index 0000000..5b8611d
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LAggregator.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.lambda.fn.SBiFunction;
+import org.apache.crunch.lambda.fn.SFunction;
+import org.apache.crunch.lambda.fn.SSupplier;
+
+/**
+ * Crunch Aggregator expressed as a composition of functional interface implementations
+ * @param <V> Type of values to be aggregated
+ * @param <A> Type of object which stores objects as they are being aggregated
+ */
+public class LAggregator<V, A> extends Aggregators.SimpleAggregator<V> {
+
+ private final SSupplier<A> initialSupplier;
+ private final SBiFunction<A, V, A> combineFn;
+ private final SFunction<A, Iterable<V>> outputFn;
+ private A a;
+
+ public LAggregator(SSupplier<A> initialSupplier, SBiFunction<A, V, A> combineFn, SFunction<A, Iterable<V>> outputFn) {
+ this.initialSupplier = initialSupplier;
+ this.combineFn = combineFn;
+ this.outputFn = outputFn;
+ }
+
+ @Override
+ public void reset() {
+ a = initialSupplier.get();
+ }
+
+ @Override
+ public void update(V v) {
+ a = combineFn.apply(a, v);
+ }
+
+ @Override
+ public Iterable<V> results() {
+ return outputFn.apply(a);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java
new file mode 100644
index 0000000..6a8dd62
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.*;
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.lambda.fn.SFunction;
+import org.apache.crunch.lambda.fn.SPredicate;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+import java.util.Optional;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * Java 8 friendly version of the {@link PCollection} interface, allowing distributed operations to be expressed in
+ * terms of lambda expressions and method references, instead of creating a new class implementation for each operation.
+ * @param <S> The type of the elements in this collection
+ */
+public interface LCollection<S> {
+ /**
+ * Get the underlying {@link PCollection} for this LCollection
+ */
+ PCollection<S> underlying();
+
+ /**
+ * Get the {@link LCollectionFactory} which can be used to create new Ltype instances
+ */
+ LCollectionFactory factory();
+
+ /**
+ * Transform this LCollection using a standard Crunch {@link DoFn}
+ */
+ default <T> LCollection<T> parallelDo(DoFn<S, T> fn, PType<T> pType) {
+ return factory().wrap(underlying().parallelDo(fn, pType));
+ }
+
+ /**
+ * Transform this LCollection to an {@link LTable} using a standard Crunch {@link DoFn}
+ */
+ default <K, V> LTable<K, V> parallelDo(DoFn<S, Pair<K, V>> fn, PTableType<K, V> pType) {
+ return factory().wrap(underlying().parallelDo(fn, pType));
+ }
+
+ /**
+ * Transform this LCollection using a Lambda-friendly {@link LDoFn}.
+ */
+ default <T> LCollection<T> parallelDo(LDoFn<S, T> fn, PType<T> pType) {
+ return parallelDo(new LDoFnWrapper<>(fn), pType);
+ }
+
+ /**
+ * Transform this LCollection using a Lambda-friendly {@link LDoFn}.
+ */
+ default <K, V> LTable<K, V> parallelDo(LDoFn<S, Pair<K, V>> fn, PTableType<K, V> pType) {
+ return parallelDo(new LDoFnWrapper<>(fn), pType);
+ }
+
+ /**
+ * Map the elements of this collection 1-1 through the supplied function.
+ */
+ default <T> LCollection<T> map(SFunction<S, T> fn, PType<T> pType) {
+ return parallelDo(ctx -> ctx.emit(fn.apply(ctx.element())), pType);
+ }
+
+ /**
+ * Map the elements of this collection 1-1 through the supplied function to yield an {@link LTable}
+ */
+ default <K, V> LTable<K, V> map(SFunction<S, Pair<K, V>> fn, PTableType<K, V> pType) {
+ return parallelDo(ctx -> ctx.emit(fn.apply(ctx.element())), pType);
+ }
+
+ /**
+ * Map each element to zero or more output elements using the provided stream-returning function.
+ */
+ default <T> LCollection<T> flatMap(SFunction<S, Stream<T>> fn, PType<T> pType) {
+ return parallelDo(ctx -> fn.apply(ctx.element()).forEach(ctx::emit), pType);
+ }
+
+ /**
+ * Map each element to zero or more output elements using the provided stream-returning function to yield an
+ * {@link LTable}
+ */
+ default <K, V> LTable<K, V> flatMap(SFunction<S, Stream<Pair<K, V>>> fn, PTableType<K, V> pType) {
+ return parallelDo(ctx -> fn.apply(ctx.element()).forEach(ctx::emit), pType);
+ }
+
+ /**
+ * Combination of a filter and map operation by using a function with {@link Optional} return type.
+ */
+ default <T> LCollection<T> filterMap(SFunction<S, Optional<T>> fn, PType<T> pType) {
+ return parallelDo(ctx -> fn.apply(ctx.element()).ifPresent(ctx::emit), pType);
+ }
+
+ /**
+ * Combination of a filter and map operation by using a function with {@link Optional} return type.
+ */
+ default <K, V> LTable<K, V> filterMap(SFunction<S, Optional<Pair<K, V>>> fn, PTableType<K, V> pType) {
+ return parallelDo(ctx -> fn.apply(ctx.element()).ifPresent(ctx::emit), pType);
+ }
+
+ /**
+ * Filter the collection using the supplied predicate.
+ */
+ default LCollection<S> filter(SPredicate<S> predicate) {
+ return parallelDo(ctx -> { if (predicate.test(ctx.element())) ctx.emit(ctx.element());}, pType());
+ }
+
+ /**
+ * Union this LCollection with another LCollection of the same type
+ */
+ default LCollection<S> union(LCollection<S> other) {
+ return factory().wrap(underlying().union(other.underlying()));
+ }
+
+ /**
+ * Union this LCollection with a {@link PCollection} of the same type
+ */
+ default LCollection<S> union(PCollection<S> other) {
+ return factory().wrap(underlying().union(other));
+ }
+
+ /**
+ * Increment a counter for every element in the collection
+ */
+ default LCollection<S> increment(Enum<?> counter) {
+ return parallelDo(ctx -> ctx.increment(counter), pType());
+ }
+
+ /**
+ * Increment a counter for every element in the collection
+ */
+ default LCollection<S> increment(String counterGroup, String counterName) {
+ return parallelDo(ctx -> ctx.increment(counterGroup, counterName), pType());
+ }
+
+ /**
+ * Increment a counter for every element satisfying the conditional predicate supplied.
+ */
+ default LCollection<S> incrementIf(Enum<?> counter, SPredicate<S> condition) {
+ return parallelDo(ctx -> {
+ if (condition.test(ctx.element())) ctx.increment(counter);
+ }, pType());
+ }
+
+ /**
+ * Increment a counter for every element satisfying the conditional predicate supplied.
+ */
+ default LCollection<S> incrementIf(String counterGroup, String counterName, SPredicate<S> condition) {
+ return parallelDo(ctx -> {
+ if (condition.test(ctx.element())) ctx.increment(counterGroup, counterName);
+ }, pType());
+ }
+
+ /**
+ * Cache the underlying {@link PCollection}
+ */
+ default LCollection<S> cache() {
+ underlying().cache();
+ return this;
+ }
+
+ /**
+ * Cache the underlying {@link PCollection}
+ */
+ default LCollection<S> cache(CachingOptions options) {
+ underlying().cache(options);
+ return this;
+ }
+
+ /**
+ * Key this LCollection by a key extracted from the element to yield a {@link LTable} mapping the key to the whole
+ * element.
+ */
+ default <K> LTable<K, S> by(SFunction<S, K> extractFn, PType<K> pType) {
+ return parallelDo(
+ ctx -> ctx.emit(Pair.of(extractFn.apply(ctx.element()), ctx.element())),
+ ptf().tableOf(pType, pType()));
+ }
+
+ /**
+ * Count distict values in this LCollection, yielding an {@link LTable} mapping each value to the number
+ * of occurrences in the collection.
+ */
+ default LTable<S, Long> count() {
+ return map(a -> Pair.of(a, 1L), ptf().tableOf(pType(), ptf().longs()))
+ .groupByKey()
+ .combineValues(Aggregators.SUM_LONGS());
+ }
+
+ /**
+ * Obtain the contents of this LCollection as a {@link Stream} that can be processed locally. Note, this may trigger
+ * your job to execute in a distributed environment if the pipeline has not yet been run.
+ */
+ default Stream<S> materialize() {
+ return StreamSupport.stream(underlying().materialize().spliterator(), false);
+ }
+
+ /**
+ * Get the {@link PTypeFamily} representing how elements of this collection may be serialized.
+ */
+ default PTypeFamily ptf() {
+ return underlying().getPType().getFamily();
+ }
+
+ /**
+ * Get the {@link PType} representing how elements of this collection may be serialized.
+ */
+ default PType<S> pType() { return underlying().getPType(); }
+
+ /**
+ * Write this collection to the specified {@link Target}
+ */
+ default LCollection<S> write(Target target) {
+ underlying().write(target);
+ return this;
+ }
+
+ /**
+ * Write this collection to the specified {@link Target} with the given {@link org.apache.crunch.Target.WriteMode}
+ */
+ default LCollection<S> write(Target target, Target.WriteMode writeMode) {
+ underlying().write(target, writeMode);
+ return this;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactory.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactory.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactory.java
new file mode 100644
index 0000000..4b208d2
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactory.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+
+/**
+ * Factory for creating {@link LCollection}, {@link LTable} and {@link LGroupedTable} objects from their corresponding
+ * {@link PCollection}, {@link PTable} and {@link PGroupedTable} types. You probably don't want to use or implement this
+ * interface directly. You should start with the {@link Lambda} class instead.
+ */
+public interface LCollectionFactory {
+ /**
+ * Wrap a PCollection into an LCollection
+ */
+ <S> LCollection<S> wrap(PCollection<S> collection);
+
+ /**
+ * Wrap a PTable into an LTable
+ */
+ <K, V> LTable<K, V> wrap(PTable<K, V> collection);
+
+ /**
+ * Wrap a PGroupedTable into an LGroupedTable
+ */
+ <K, V> LGroupedTable<K, V> wrap(PGroupedTable<K, V> collection);
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactoryImpl.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactoryImpl.java
new file mode 100644
index 0000000..4bedfa7
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactoryImpl.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+
+class LCollectionFactoryImpl implements LCollectionFactory {
+
+ @Override
+ public <S> LCollection<S> wrap(final PCollection<S> collection) {
+ return new LCollection<S>() {
+ @Override
+ public PCollection<S> underlying() {
+ return collection;
+ }
+
+ @Override
+ public LCollectionFactory factory() {
+ return LCollectionFactoryImpl.this;
+ }
+ };
+ }
+
+ @Override
+ public <K, V> LTable<K, V> wrap(final PTable<K, V> collection) {
+ return new LTable<K, V>() {
+ @Override
+ public PTable<K, V> underlying() {
+ return collection;
+ }
+
+ @Override
+ public LCollectionFactory factory() {
+ return LCollectionFactoryImpl.this;
+ }
+ };
+ }
+
+ @Override
+ public <K, V> LGroupedTable<K, V> wrap(final PGroupedTable<K, V> collection) {
+ return new LGroupedTable<K, V>() {
+ @Override
+ public PGroupedTable<K, V> underlying() {
+ return collection;
+ }
+
+ @Override
+ public LCollectionFactory factory() {
+ return LCollectionFactoryImpl.this;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFn.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFn.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFn.java
new file mode 100644
index 0000000..1be8085
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFn.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.DoFn;
+
+import java.io.Serializable;
+
+/**
+ * A Java lambdas friendly version of the {@link DoFn} class.
+ */
+public interface LDoFn<S, T> extends Serializable {
+
+ void process(LDoFnContext<S, T> context);
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnContext.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnContext.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnContext.java
new file mode 100644
index 0000000..3743a2f
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnContext.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * Context object for implementing distributed operations in terms of Lambda expressions.
+ * @param <S> Input type of LDoFn
+ * @param <T> Output type of LDoFn
+ */
+public interface LDoFnContext<S, T> {
+ /** Get the input element */
+ S element();
+
+ /** Emit t to the output */
+ void emit(T t);
+
+ /** Get the underlying {@link TaskInputOutputContext} (for special cases) */
+ TaskInputOutputContext getContext();
+
+ /** Get the current Hadoop {@link Configuration} */
+ Configuration getConfiguration();
+
+ /** Increment a counter by 1 */
+ void increment(String groupName, String counterName);
+
+ /** Increment a counter by value */
+ void increment(String groupName, String counterName, long value);
+
+ /** Increment a counter by 1 */
+ void increment(Enum<?> counterName);
+
+ /** Increment a counter by value */
+ void increment(Enum<?> counterName, long value);
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnWrapper.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnWrapper.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnWrapper.java
new file mode 100644
index 0000000..76087d5
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnWrapper.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+class LDoFnWrapper<S, T> extends DoFn<S, T> {
+
+ private final LDoFn<S, T> fn;
+ private transient Context<S, T> ctxt;
+
+ public LDoFnWrapper(LDoFn<S, T> fn) {
+ this.fn = fn;
+ }
+
+ @Override
+ public void initialize() {
+ super.initialize();
+ if (getContext() == null) {
+ this.ctxt = new Context<>(getConfiguration());
+ } else {
+ this.ctxt = new Context<>(getContext());
+ }
+ }
+
+ @Override
+ public void process(S input, Emitter<T> emitter) {
+ fn.process(ctxt.update(input, emitter));
+ }
+ static class Context<S, T> implements LDoFnContext<S, T> {
+ private S element;
+ private Emitter<T> emitter;
+ private TaskInputOutputContext context;
+ private Configuration conf;
+
+ public Context(TaskInputOutputContext context) {
+ this.context = context;
+ this.conf = context.getConfiguration();
+ }
+
+ public Context(Configuration conf) {
+ this.context = null;
+ this.conf = conf;
+ }
+
+ public Context<S, T> update(S element, Emitter<T> emitter) {
+ this.element = element;
+ this.emitter = emitter;
+ return this;
+ }
+
+ public S element() {
+ return element;
+ }
+
+ public void emit(T t) {
+ emitter.emit(t);
+ }
+
+ public TaskInputOutputContext getContext() {
+ return context;
+ }
+
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ public void increment(String groupName, String counterName) {
+ increment(groupName, counterName, 1);
+ }
+
+ public void increment(String groupName, String counterName, long value) {
+ if (context != null) {
+ context.getCounter(groupName, counterName).increment(value);
+ }
+ }
+
+ public void increment(Enum<?> counterName) {
+ increment(counterName, 1);
+ }
+
+ public void increment(Enum<?> counterName, long value) {
+ if (context != null) {
+ context.getCounter(counterName).increment(value);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LGroupedTable.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LGroupedTable.java
new file mode 100644
index 0000000..10209e0
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LGroupedTable.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.Aggregator;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.lambda.fn.SBiConsumer;
+import org.apache.crunch.lambda.fn.SBiFunction;
+import org.apache.crunch.lambda.fn.SBinaryOperator;
+import org.apache.crunch.lambda.fn.SFunction;
+import org.apache.crunch.lambda.fn.SSupplier;
+import org.apache.crunch.types.PType;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+/**
+ * Java 8 friendly version of the {@link PGroupedTable} interface, allowing distributed operations to be expressed in
+ * terms of lambda expressions and method references, instead of creating a new class implementation for each operation.
+ * @param <K> key type for this table
+ * @param <V> value type for this table
+ */
+public interface LGroupedTable<K, V> extends LCollection<Pair<K, Iterable<V>>> {
+ /**
+ * Get the underlying {@link PGroupedTable} for this LGroupedTable
+ */
+ PGroupedTable<K, V> underlying();
+
+ /**
+ * Combine the value part of the table using the provided Crunch {@link Aggregator}. This will be optimised into
+ * both a combine and reduce in the MapReduce implementation, with similar optimisations available for other
+ * implementations.
+ */
+ default LTable<K, V> combineValues(Aggregator<V> aggregator) {
+ return factory().wrap(underlying().combineValues(aggregator));
+ }
+
+ /**
+ * Combine the value part of the table using the given functions. The supplier is used to create a new aggregating
+ * type, the combineFn adds a value into the aggregate, and the output function transforms the aggregate into
+ * an iterable of the original value type. For example, summation can be expressed as follows:
+ *
+ * <pre>{@code myGroupedTable.combineValues(() -> 0, (sum, value) -> sum + value, Collections::singleton) }</pre>
+ *
+ * <p>This will be optimised into both a combine and reduce in the MapReduce implementation, with similar
+ * optimizations *available for other implementations.</p>
+ */
+ default <A> LTable<K, V> combineValues(
+ SSupplier<A> initialSupplier,
+ SBiFunction<A, V, A> combineFn,
+ SFunction<A, Iterable<V>> outputFn) {
+ return combineValues(new LAggregator<>(initialSupplier, combineFn, outputFn));
+ }
+
+ /**
+ * Map the values in this LGroupedTable using a custom function. This function operates over a stream which can
+ * be consumed only once.
+ *
+ * <p>Note that in serialization systems which heavily reuse objects (such as Avro), you may
+ * in fact get given the same object multiple times with different data as you consume the stream, meaning it may
+ * be necessary to detach values.</p>
+ */
+ default <T> LTable<K, T> mapValues(SFunction<Stream<V>, T> fn, PType<T> pType) {
+ return parallelDo(
+ ctx -> ctx.emit(Pair.of(
+ ctx.element().first(),
+ fn.apply(StreamSupport.stream(ctx.element().second().spliterator(), false)))
+ ), ptf().tableOf(keyType(), pType));
+ }
+
+ /**
+ * Collect the values into an aggregate type. This differs from combineValues in that it outputs the aggregate type
+ * rather than the value type, and is designed to happen in one step (rather than being optimised into multiple
+ * levels). This makes it much more suitable for assembling collections than computing simple numeric aggregates.
+ *
+ * <p>The supplier provides an "empty" object, then the consumer is called with each value. For example, to collect
+ * all values into a {@link Collection}, one can do this:</p>
+ * <pre>{@code
+ * lgt.collectValues(ArrayList::new, Collection::add, lgt.ptf().collections(lgt.valueType()))
+ * }</pre>
+ *
+ * <p>This is in fact the default implementation for the collectAllValues() method.</p>
+ *
+ * <p>Note that in serialization systems which heavily reuse objects (such as Avro), you may
+ * in fact get given the same object multiple times with different data as you consume the stream, meaning it may
+ * be necessary to detach values.</p>
+ */
+ default <C> LTable<K, C> collectValues(SSupplier<C> emptySupplier, SBiConsumer<C, V> addFn, PType<C> pType) {
+ return parallelDo(ctx -> {
+ C coll = emptySupplier.get();
+ ctx.element().second().forEach(v -> addFn.accept(coll, v));
+ ctx.emit(Pair.of(ctx.element().first(), coll));
+ }, ptf().tableOf(keyType(), pType));
+ }
+
+ /**
+ * Collect all values for each key into a {@link Collection}
+ */
+ default LTable<K, Collection<V>> collectAllValues() {
+ return collectValues(ArrayList::new, Collection::add, ptf().collections(valueType()));
+ }
+
+ /**
+ * Collect all unique values for each key into a {@link Collection} (note that the value type must have a correctly-
+ * defined equals() and hashcode().
+ */
+ default LTable<K, Collection<V>> collectUniqueValues() {
+ return collectValues(HashSet::new, Collection::add, ptf().collections(valueType()));
+ }
+
+ /**
+ * Reduce the values for each key using the an associative binary operator.
+ * For example {@code reduceValues((a, b) -> a + b)} for summation, {@code reduceValues((a, b) -> a + ", " + b}
+ * for comma-separated string concatenation and {@code reduceValues((a, b) -> a > b ? a : b} for maximum value.
+ */
+ default LTable<K, V> reduceValues(SBinaryOperator<V> operator) {
+ return combineValues(() -> (V)null, (a, b) -> a == null ? b : operator.apply(a, b), Collections::singleton);
+ }
+
+ /**
+ * Ungroup this LGroupedTable back into an {@link LTable}. This will still trigger a "reduce" operation, so is
+ * usually only used in special cases like producing a globally-ordered list by feeding the everything through
+ * a single reducers.
+ */
+ default LTable<K, V> ungroup() {
+ return factory().wrap(underlying().ungroup());
+ }
+
+ /**
+ * Get a {@link PType} which can be used to serialize the key part of this grouped table
+ */
+ default PType<K> keyType() {
+ return underlying().getGroupedTableType().getTableType().getKeyType();
+ }
+
+ /**
+ * Get a {@link PType} which can be used to serialize the value part of this grouped table
+ */
+ default PType<V> valueType() {
+ return underlying().getGroupedTableType().getTableType().getValueType();
+ }
+
+}