You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:37 UTC
[35/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
new file mode 100644
index 0000000..84c720c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Manages the state of a pipeline execution.
+ *
+ */
+public interface Pipeline {
+
+ /**
+ * Set the {@code Configuration} to use with this pipeline.
+ */
+ void setConfiguration(Configuration conf);
+
+ /**
+ * Returns the name of this pipeline.
+ *
+ * @return Name of the pipeline
+ */
+ String getName();
+
+ /**
+ * Returns the {@code Configuration} instance associated with this pipeline.
+ */
+ Configuration getConfiguration();
+
+ /**
+ * Converts the given {@code Source} into a {@code PCollection} that is
+ * available to jobs run using this {@code Pipeline} instance.
+ *
+ * @param source
+ * The source of data
+ * @return A PCollection that references the given source
+ */
+ <T> PCollection<T> read(Source<T> source);
+
+ /**
+ * A version of the read method for {@code TableSource} instances that map to
+ * {@code PTable}s.
+ *
+ * @param tableSource
+ * The source of the data
+ * @return A PTable that references the given source
+ */
+ <K, V> PTable<K, V> read(TableSource<K, V> tableSource);
+
+ /**
+ * Write the given collection to the given target on the next pipeline run. The
+ * system will check to see if the target's location already exists using the
+ * {@code WriteMode.DEFAULT} rule for the given {@code Target}.
+ *
+ * @param collection
+ * The collection
+ * @param target
+ * The output target
+ */
+ void write(PCollection<?> collection, Target target);
+
+ /**
+ * Write the contents of the {@code PCollection} to the given {@code Target},
+ * using the storage format specified by the target and the given
+ * {@code WriteMode} for cases where the referenced {@code Target}
+ * already exists.
+ *
+ * @param collection
+ * The collection
+ * @param target
+ * The target to write to
+ * @param writeMode
+ * The strategy to use for handling existing outputs
+ */
+ void write(PCollection<?> collection, Target target,
+ Target.WriteMode writeMode);
+
+ /**
+ * Create the given PCollection and read the data it contains into the
+ * returned Collection instance for client use.
+ *
+ * @param pcollection
+ * The PCollection to materialize
+ * @return the data from the PCollection as a read-only Collection
+ */
+ <T> Iterable<T> materialize(PCollection<T> pcollection);
+
+ /**
+ * Constructs and executes a series of MapReduce jobs in order to write data
+ * to the output targets.
+ */
+ PipelineResult run();
+
+ /**
+ * Constructs and starts a series of MapReduce jobs in order ot write data to
+ * the output targets, but returns a {@code ListenableFuture} to allow clients to control
+ * job execution.
+ * @return
+ */
+ PipelineExecution runAsync();
+
+ /**
+ * Run any remaining jobs required to generate outputs and then clean up any
+ * intermediate data files that were created in this run or previous calls to
+ * {@code run}.
+ */
+ PipelineResult done();
+
+ /**
+ * A convenience method for reading a text file.
+ */
+ PCollection<String> readTextFile(String pathName);
+
+ /**
+ * A convenience method for writing a text file.
+ */
+ <T> void writeTextFile(PCollection<T> collection, String pathName);
+
+ /**
+ * Turn on debug logging for jobs that are run from this pipeline.
+ */
+ void enableDebug();
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java b/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java
new file mode 100644
index 0000000..fc6bb91
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A handle to allow clients to control a Crunch pipeline as it runs.
+ *
+ * This interface is thread-safe.
+ */
+public interface PipelineExecution {
+
+ enum Status { READY, RUNNING, SUCCEEDED, FAILED, KILLED }
+
+ /** Returns the .dot file that allows a client to graph the Crunch execution plan for this
+ * pipeline.
+ */
+ String getPlanDotFile();
+
+ /** Blocks until pipeline completes or the specified waiting time elapsed. */
+ void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException;
+
+ /** Blocks until pipeline completes, i.e. {@code SUCCEEDED}, {@code FAILED} or {@code KILLED}. */
+ void waitUntilDone() throws InterruptedException;
+
+ Status getStatus();
+
+ /** Retrieve the result of a pipeline if it has been completed, otherwise {@code null}. */
+ PipelineResult getResult();
+
+ /**
+ * Kills the pipeline if it is running, no-op otherwise.
+ *
+ * This method only delivers a kill signal to the pipeline, and does not guarantee the pipeline exits on return.
+ * To wait for completely exits, use {@link #waitUntilDone()} after this call.
+ */
+ void kill() throws InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
new file mode 100644
index 0000000..90b1067
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Container for the results of a call to {@code run} or {@code done} on the
+ * Pipeline interface that includes details and statistics about the component
+ * stages of the data pipeline.
+ */
+public class PipelineResult {
+
+ public static class StageResult {
+
+ private final String stageName;
+ private final Counters counters;
+
+ public StageResult(String stageName, Counters counters) {
+ this.stageName = stageName;
+ this.counters = counters;
+ }
+
+ public String getStageName() {
+ return stageName;
+ }
+
+ public Counters getCounters() {
+ return counters;
+ }
+
+ public Counter findCounter(Enum<?> key) {
+ return counters.findCounter(key);
+ }
+
+ public long getCounterValue(Enum<?> key) {
+ return findCounter(key).getValue();
+ }
+ }
+
+ public static final PipelineResult EMPTY = new PipelineResult(ImmutableList.<StageResult> of());
+
+ private final List<StageResult> stageResults;
+
+ public PipelineResult(List<StageResult> stageResults) {
+ this.stageResults = ImmutableList.copyOf(stageResults);
+ }
+
+ public boolean succeeded() {
+ return !stageResults.isEmpty();
+ }
+
+ public List<StageResult> getStageResults() {
+ return stageResults;
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/Source.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Source.java b/crunch-core/src/main/java/org/apache/crunch/Source.java
new file mode 100644
index 0000000..f54d135
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/Source.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.io.IOException;
+
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * A {@code Source} represents an input data set that is an input to one or more
+ * MapReduce jobs.
+ *
+ */
+public interface Source<T> {
+ /**
+ * Returns the {@code PType} for this source.
+ */
+ PType<T> getType();
+
+ /**
+ * Configure the given job to use this source as an input.
+ *
+ * @param job
+ * The job to configure
+ * @param inputId
+ * For a multi-input job, an identifier for this input to the job
+ * @throws IOException
+ */
+ void configureSource(Job job, int inputId) throws IOException;
+
+ /**
+ * Returns the number of bytes in this {@code Source}.
+ */
+ long getSize(Configuration configuration);
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java
new file mode 100644
index 0000000..09c03c6
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+/**
+ * An interface for classes that implement both the {@code Source} and the
+ * {@code Target} interfaces.
+ *
+ */
+public interface SourceTarget<T> extends Source<T>, Target {
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/TableSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/TableSource.java b/crunch-core/src/main/java/org/apache/crunch/TableSource.java
new file mode 100644
index 0000000..ff27346
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/TableSource.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.crunch.types.PTableType;
+
+/**
+ * The interface {@code Source} implementations that return a {@link PTable}.
+ *
+ */
+public interface TableSource<K, V> extends Source<Pair<K, V>> {
+ PTableType<K, V> getTableType();
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/TableSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/TableSourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/TableSourceTarget.java
new file mode 100644
index 0000000..9b1ed34
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/TableSourceTarget.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+/**
+ * An interface for classes that implement both the {@code TableSource} and the
+ * {@code Target} interfaces.
+ */
+public interface TableSourceTarget<K, V> extends TableSource<K, V>, SourceTarget<Pair<K, V>> {
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/Target.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Target.java b/crunch-core/src/main/java/org/apache/crunch/Target.java
new file mode 100644
index 0000000..0a0c23d
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/Target.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A {@code Target} represents the output destination of a Crunch {@code PCollection}
+ * in the context of a Crunch job.
+ */
+public interface Target {
+
+ /**
+ * An enum to represent different options the client may specify
+ * for handling the case where the output path, table, etc. referenced
+ * by a {@code Target} already exists.
+ */
+ enum WriteMode {
+ /**
+ * Check to see if the output target already exists before running
+ * the pipeline, and if it does, print an error and throw an exception.
+ */
+ DEFAULT,
+
+ /**
+ * Check to see if the output target already exists, and if it does,
+ * delete it and overwrite it with the new output (if any).
+ */
+ OVERWRITE,
+
+ /**
+ * If the output target does not exist, create it. If it does exist,
+ * add the output of this pipeline to the target. This was the
+ * behavior in Crunch up to version 0.4.0.
+ */
+ APPEND
+ }
+
+ /**
+ * Apply the given {@code WriteMode} to this {@code Target} instance.
+ *
+ * @param writeMode The strategy for handling existing outputs
+ * @param conf The ever-useful {@code Configuration} instance
+ */
+ void handleExisting(WriteMode writeMode, Configuration conf);
+
+ /**
+ * Checks to see if this {@code Target} instance is compatible with the
+ * given {@code PType}.
+ *
+ * @param handler The {@link OutputHandler} that is managing the output for the job
+ * @param ptype The {@code PType} to check
+ * @return True if this Target can write data in the form of the given {@code PType},
+ * false otherwise
+ */
+ boolean accept(OutputHandler handler, PType<?> ptype);
+
+ /**
+ * Attempt to create the {@code SourceTarget} type that corresponds to this {@code Target}
+ * for the given {@code PType}, if possible. If it is not possible, return {@code null}.
+ *
+ * @param ptype The {@code PType} to use in constructing the {@code SourceTarget}
+ * @return A new {@code SourceTarget} or null if such a {@code SourceTarget} does not exist
+ */
+ <T> SourceTarget<T> asSourceTarget(PType<T> ptype);
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/Tuple.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Tuple.java b/crunch-core/src/main/java/org/apache/crunch/Tuple.java
new file mode 100644
index 0000000..4e602ff
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/Tuple.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+/**
+ * A fixed-size collection of Objects, used in Crunch for representing joins
+ * between {@code PCollection}s.
+ *
+ */
+public interface Tuple {
+
+ /**
+ * Returns the Object at the given index.
+ */
+ Object get(int index);
+
+ /**
+ * Returns the number of elements in this Tuple.
+ */
+ int size();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/Tuple3.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Tuple3.java b/crunch-core/src/main/java/org/apache/crunch/Tuple3.java
new file mode 100644
index 0000000..4372811
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/Tuple3.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+/**
+ * A convenience class for three-element {@link Tuple}s.
+ */
+public class Tuple3<V1, V2, V3> implements Tuple {
+
+ private final V1 first;
+ private final V2 second;
+ private final V3 third;
+
+ public static <A, B, C> Tuple3<A, B, C> of(A a, B b, C c) {
+ return new Tuple3<A, B, C>(a, b, c);
+ }
+
+ public Tuple3(V1 first, V2 second, V3 third) {
+ this.first = first;
+ this.second = second;
+ this.third = third;
+ }
+
+ public V1 first() {
+ return first;
+ }
+
+ public V2 second() {
+ return second;
+ }
+
+ public V3 third() {
+ return third;
+ }
+
+ public Object get(int index) {
+ switch (index) {
+ case 0:
+ return first;
+ case 1:
+ return second;
+ case 2:
+ return third;
+ default:
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ }
+
+ public int size() {
+ return 3;
+ }
+
+ @Override
+ public int hashCode() {
+ HashCodeBuilder hcb = new HashCodeBuilder();
+ return hcb.append(first).append(second).append(third).toHashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ Tuple3<?, ?, ?> other = (Tuple3<?, ?, ?>) obj;
+ return (first == other.first || (first != null && first.equals(other.first)))
+ && (second == other.second || (second != null && second.equals(other.second)))
+ && (third == other.third || (third != null && third.equals(other.third)));
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("Tuple3[");
+ sb.append(first).append(",").append(second).append(",").append(third);
+ return sb.append("]").toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/Tuple4.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Tuple4.java b/crunch-core/src/main/java/org/apache/crunch/Tuple4.java
new file mode 100644
index 0000000..f161371
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/Tuple4.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+/**
+ * A convenience class for four-element {@link Tuple}s.
+ */
+public class Tuple4<V1, V2, V3, V4> implements Tuple {
+
+ private final V1 first;
+ private final V2 second;
+ private final V3 third;
+ private final V4 fourth;
+
+ public static <A, B, C, D> Tuple4<A, B, C, D> of(A a, B b, C c, D d) {
+ return new Tuple4<A, B, C, D>(a, b, c, d);
+ }
+
+ public Tuple4(V1 first, V2 second, V3 third, V4 fourth) {
+ this.first = first;
+ this.second = second;
+ this.third = third;
+ this.fourth = fourth;
+ }
+
+ public V1 first() {
+ return first;
+ }
+
+ public V2 second() {
+ return second;
+ }
+
+ public V3 third() {
+ return third;
+ }
+
+ public V4 fourth() {
+ return fourth;
+ }
+
+ public Object get(int index) {
+ switch (index) {
+ case 0:
+ return first;
+ case 1:
+ return second;
+ case 2:
+ return third;
+ case 3:
+ return fourth;
+ default:
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ }
+
+ public int size() {
+ return 4;
+ }
+
+ @Override
+ public int hashCode() {
+ HashCodeBuilder hcb = new HashCodeBuilder();
+ return hcb.append(first).append(second).append(third).append(fourth).toHashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ Tuple4<?, ?, ?, ?> other = (Tuple4<?, ?, ?, ?>) obj;
+ return (first == other.first || (first != null && first.equals(other.first)))
+ && (second == other.second || (second != null && second.equals(other.second)))
+ && (third == other.third || (third != null && third.equals(other.third)))
+ && (fourth == other.fourth || (fourth != null && fourth.equals(other.fourth)));
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("Tuple4[");
+ sb.append(first).append(",").append(second).append(",").append(third);
+ return sb.append(",").append(fourth).append("]").toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/TupleN.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/TupleN.java b/crunch-core/src/main/java/org/apache/crunch/TupleN.java
new file mode 100644
index 0000000..e5eceb5
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/TupleN.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.util.Arrays;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+/**
+ * A {@link Tuple} instance for an arbitrary number of values.
+ */
+public class TupleN implements Tuple {
+
+ private final Object values[];
+
+ public static TupleN of(Object... values) {
+ return new TupleN(values);
+ }
+
+ public TupleN(Object... values) {
+ this.values = new Object[values.length];
+ System.arraycopy(values, 0, this.values, 0, values.length);
+ }
+
+ public Object get(int index) {
+ return values[index];
+ }
+
+ public int size() {
+ return values.length;
+ }
+
+ @Override
+ public int hashCode() {
+ HashCodeBuilder hcb = new HashCodeBuilder();
+ for (Object v : values) {
+ hcb.append(v);
+ }
+ return hcb.toHashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ TupleN other = (TupleN) obj;
+ return Arrays.equals(this.values, other.values);
+ }
+
+ @Override
+ public String toString() {
+ return Arrays.toString(values);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
new file mode 100644
index 0000000..0ac79e2
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
@@ -0,0 +1,1111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import java.math.BigInteger;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+
+import org.apache.crunch.Aggregator;
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.util.Tuples;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+
+/**
+ * A collection of pre-defined {@link org.apache.crunch.Aggregator}s.
+ *
+ * <p>The factory methods of this class return {@link org.apache.crunch.Aggregator}
+ * instances that you can use to combine the values of a {@link PGroupedTable}.
+ * In most cases, they turn a multimap (multiple entries per key) into a map (one
+ * entry per key).</p>
+ *
+ * <p><strong>Note</strong>: When using composed aggregators, like those built by the
+ * {@link #pairAggregator(Aggregator, Aggregator) pairAggregator()}
+ * factory method, you typically don't want to put in the same child aggregator more than once,
+ * even if all child aggregators have the same type. In most cases, this is what you want:</p>
+ *
+ * <pre>
+ * PTable<K, Long> result = groupedTable.combineValues(
+ * pairAggregator(SUM_LONGS(), SUM_LONGS())
+ * );
+ * </pre>
+ */
+public final class Aggregators {
+
+ private Aggregators() {
+ // utility class, not for instantiation
+ }
+
+ /**
+ * Sum up all {@code long} values.
+ * @return The newly constructed instance
+ */
+ public static Aggregator<Long> SUM_LONGS() {
+ return new SumLongs();
+ }
+
+ /**
+ * Sum up all {@code int} values.
+ * @return The newly constructed instance
+ */
+ public static Aggregator<Integer> SUM_INTS() {
+ return new SumInts();
+ }
+
+ /**
+ * Sum up all {@code float} values.
+ * @return The newly constructed instance
+ */
+ public static Aggregator<Float> SUM_FLOATS() {
+ return new SumFloats();
+ }
+
+ /**
+ * Sum up all {@code double} values.
+ * @return The newly constructed instance
+ */
+ public static Aggregator<Double> SUM_DOUBLES() {
+ return new SumDoubles();
+ }
+
+ /**
+ * Sum up all {@link BigInteger} values.
+ * @return The newly constructed instance
+ */
+ public static Aggregator<BigInteger> SUM_BIGINTS() {
+ return new SumBigInts();
+ }
+
+ /**
+ * Return the maximum of all given {@code long} values.
+ * @return The newly constructed instance
+ */
+ public static Aggregator<Long> MAX_LONGS() {
+ return new MaxLongs();
+ }
+
+ /**
+ * Return the {@code n} largest {@code long} values (or fewer if there are fewer
+ * values than {@code n}).
+ * @param n The number of values to return
+ * @return The newly constructed instance
+ */
+ public static Aggregator<Long> MAX_LONGS(int n) {
+ return new MaxLongs();
+ }
+
+ /**
+ * Return the maximum of all given {@code int} values.
+ * @return The newly constructed instance
+ */
+ public static Aggregator<Integer> MAX_INTS() {
+ return new MaxInts();
+ }
+
+ /**
+ * Return the {@code n} largest {@code int} values (or fewer if there are fewer
+ * values than {@code n}).
+ * @param n The number of values to return
+ * @return The newly constructed instance
+ */
+ public static Aggregator<Integer> MAX_INTS(int n) {
+ return new MaxNAggregator<Integer>(n);
+ }
+
+ /**
+ * Return the maximum of all given {@code float} values.
+ * @return The newly constructed instance
+ */
+ public static Aggregator<Float> MAX_FLOATS() {
+ return new MaxFloats();
+ }
+
+ /**
+ * Return the {@code n} largest {@code float} values (or fewer if there are fewer
+ * values than {@code n}).
+ * @param n The number of values to return
+ * @return The newly constructed instance
+ */
+ public static Aggregator<Float> MAX_FLOATS(int n) {
+ return new MaxNAggregator<Float>(n);
+ }
+
+ /**
+ * Return the maximum of all given {@code double} values.
+ * @return The newly constructed instance
+ */
+ public static Aggregator<Double> MAX_DOUBLES() {
+ return new MaxDoubles();
+ }
+
+ /**
+ * Return the {@code n} largest {@code double} values (or fewer if there are fewer
+ * values than {@code n}).
+ * @param n The number of values to return
+ * @return The newly constructed instance
+ */
+ public static Aggregator<Double> MAX_DOUBLES(int n) {
+ return new MaxNAggregator<Double>(n);
+ }
+
+ /**
+ * Return the maximum of all given {@link BigInteger} values.
+ * @return The newly constructed instance
+ */
+ public static Aggregator<BigInteger> MAX_BIGINTS() {
+ return new MaxBigInts();
+ }
+
+ /**
+ * Return the {@code n} largest {@link BigInteger} values (or fewer if there are fewer
+ * values than {@code n}).
+ * @param n The number of values to return
+ * @return The newly constructed instance
+ */
+ public static Aggregator<BigInteger> MAX_BIGINTS(int n) {
+ return new MaxNAggregator<BigInteger>(n);
+ }
+
+ /**
+ * Return the {@code n} largest values (or fewer if there are fewer
+ * values than {@code n}).
+ * @param n The number of values to return
+ * @param cls The type of the values to aggregate (must implement {@link Comparable}!)
+ * @return The newly constructed instance
+ */
+ public static <V extends Comparable<V>> Aggregator<V> MAX_N(int n, Class<V> cls) {
+ return new MaxNAggregator<V>(n);
+ }
+
+ /**
+ * Return the minimum of all given {@code long} values.
+ * @return The newly constructed instance
+ */
+ public static Aggregator<Long> MIN_LONGS() {
+ return new MinLongs();
+ }
+
+ /**
+ * Return the {@code n} smallest {@code long} values (or fewer if there are fewer
+ * values than {@code n}).
+ * @param n The number of values to return
+ * @return The newly constructed instance
+ */
+ public static Aggregator<Long> MIN_LONGS(int n) {
+ return new MinNAggregator<Long>(n);
+ }
+
+ /**
+ * Return the minimum of all given {@code int} values.
+ * @return The newly constructed instance
+ */
+ public static Aggregator<Integer> MIN_INTS() {
+ return new MinInts();
+ }
+
+ /**
+ * Return the {@code n} smallest {@code int} values (or fewer if there are fewer
+ * values than {@code n}).
+ * @param n The number of values to return
+ * @return The newly constructed instance
+ */
+ public static Aggregator<Integer> MIN_INTS(int n) {
+ return new MinNAggregator<Integer>(n);
+ }
+
+ /**
+ * Return the minimum of all given {@code float} values.
+ * @return The newly constructed instance
+ */
+ public static Aggregator<Float> MIN_FLOATS() {
+ return new MinFloats();
+ }
+
+ /**
+ * Return the {@code n} smallest {@code float} values (or fewer if there are fewer
+ * values than {@code n}).
+ * @param n The number of values to return
+ * @return The newly constructed instance
+ */
+ public static Aggregator<Float> MIN_FLOATS(int n) {
+ return new MinNAggregator<Float>(n);
+ }
+
+ /**
+ * Return the minimum of all given {@code double} values.
+ * @return The newly constructed instance
+ */
+ public static Aggregator<Double> MIN_DOUBLES() {
+ return new MinDoubles();
+ }
+
+ /**
+ * Return the {@code n} smallest {@code double} values (or fewer if there are fewer
+ * values than {@code n}).
+ * @param n The number of values to return
+ * @return The newly constructed instance
+ */
+ public static Aggregator<Double> MIN_DOUBLES(int n) {
+ return new MinNAggregator<Double>(n);
+ }
+
+ /**
+ * Return the minimum of all given {@link BigInteger} values.
+ * @return The newly constructed instance
+ */
+ public static Aggregator<BigInteger> MIN_BIGINTS() {
+ return new MinBigInts();
+ }
+
+ /**
+ * Return the {@code n} smallest {@link BigInteger} values (or fewer if there are fewer
+ * values than {@code n}).
+ * @param n The number of values to return
+ * @return The newly constructed instance
+ */
+ public static Aggregator<BigInteger> MIN_BIGINTS(int n) {
+ return new MinNAggregator<BigInteger>(n);
+ }
+
+ /**
+ * Return the {@code n} smallest values (or fewer if there are fewer
+ * values than {@code n}).
+ * @param n The number of values to return
+ * @param cls The type of the values to aggregate (must implement {@link Comparable}!)
+ * @return The newly constructed instance
+ */
+ public static <V extends Comparable<V>> Aggregator<V> MIN_N(int n, Class<V> cls) {
+ return new MinNAggregator<V>(n);
+ }
+
+ /**
+ * Return the first {@code n} values (or fewer if there are fewer values than {@code n}).
+ *
+ * @param n The number of values to return
+ * @return The newly constructed instance
+ */
+ public static <V> Aggregator<V> FIRST_N(int n) {
+ return new FirstNAggregator<V>(n);
+ }
+
+ /**
+ * Return the last {@code n} values (or fewer if there are fewer values than {@code n}).
+ *
+ * @param n The number of values to return
+ * @return The newly constructed instance
+ */
+ public static <V> Aggregator<V> LAST_N(int n) {
+ return new LastNAggregator<V>(n);
+ }
+
+ /**
+ * Concatenate strings, with a separator between strings. There
+ * is no limits of length for the concatenated string.
+ *
+ * <p><em>Note: String concatenation is not commutative, which means the
+ * result of the aggregation is not deterministic!</em></p>
+ *
+ * @param separator
+ * the separator which will be appended between each string
+ * @param skipNull
+ * define if we should skip null values. Throw
+ * NullPointerException if set to false and there is a null
+ * value.
+ * @return The newly constructed instance
+ */
+ public static Aggregator<String> STRING_CONCAT(String separator, boolean skipNull) {
+ return new StringConcatAggregator(separator, skipNull);
+ }
+
+ /**
+ * Concatenate strings, with a separator between strings. You can specify
+ * the maximum length of the output string and of the input strings, if
+ * they are > 0. If a value is <= 0, there is no limit.
+ *
+ * <p>Any too large string (or any string which would made the output too
+ * large) will be silently discarded.</p>
+ *
+ * <p><em>Note: String concatenation is not commutative, which means the
+ * result of the aggregation is not deterministic!</em></p>
+ *
+ * @param separator
+ * the separator which will be appended between each string
+ * @param skipNull
+ * define if we should skip null values. Throw
+ * NullPointerException if set to false and there is a null
+ * value.
+ * @param maxOutputLength
+ * the maximum length of the output string. If it's set <= 0,
+ * there is no limit. The number of characters of the output
+ * string will be < maxOutputLength.
+ * @param maxInputLength
+ * the maximum length of the input strings. If it's set <= 0,
+ * there is no limit. The number of characters of the input string
+ * will be < maxInputLength to be concatenated.
+ * @return The newly constructed instance
+ */
+ public static Aggregator<String> STRING_CONCAT(String separator, boolean skipNull,
+ long maxOutputLength, long maxInputLength) {
+ return new StringConcatAggregator(separator, skipNull, maxOutputLength, maxInputLength);
+ }
+
+ /**
+ * Collect the unique elements of the input, as defined by the {@code equals} method for
+ * the input objects. No guarantees are made about the order in which the final elements
+ * will be returned.
+ *
+ * @return The newly constructed instance
+ */
+ public static <V> Aggregator<V> UNIQUE_ELEMENTS() {
+ return new SetAggregator<V>();
+ }
+
+ /**
+ * Collect a sample of unique elements from the input, where 'unique' is defined by
+ * the {@code equals} method for the input objects. No guarantees are made about which
+ * elements will be returned, simply that there will not be any more than the given sample
+ * size for any key.
+ *
+ * @param maximumSampleSize The maximum number of unique elements to return per key
+ * @return The newly constructed instance
+ */
+ public static <V> Aggregator<V> SAMPLE_UNIQUE_ELEMENTS(int maximumSampleSize) {
+ return new SetAggregator<V>(maximumSampleSize);
+ }
+
+ /**
+ * Apply separate aggregators to each component of a {@link Pair}.
+ */
+ public static <V1, V2> Aggregator<Pair<V1, V2>> pairAggregator(
+ Aggregator<V1> a1, Aggregator<V2> a2) {
+ return new PairAggregator<V1, V2>(a1, a2);
+ }
+
+ /**
+ * Apply separate aggregators to each component of a {@link Tuple3}.
+ */
+ public static <V1, V2, V3> Aggregator<Tuple3<V1, V2, V3>> tripAggregator(
+ Aggregator<V1> a1, Aggregator<V2> a2, Aggregator<V3> a3) {
+ return new TripAggregator<V1, V2, V3>(a1, a2, a3);
+ }
+
+ /**
+ * Apply separate aggregators to each component of a {@link Tuple4}.
+ */
+ public static <V1, V2, V3, V4> Aggregator<Tuple4<V1, V2, V3, V4>> quadAggregator(
+ Aggregator<V1> a1, Aggregator<V2> a2, Aggregator<V3> a3, Aggregator<V4> a4) {
+ return new QuadAggregator<V1, V2, V3, V4>(a1, a2, a3, a4);
+ }
+
+ /**
+ * Apply separate aggregators to each component of a {@link Tuple}.
+ */
+ public static Aggregator<TupleN> tupleAggregator(Aggregator<?>... aggregators) {
+ return new TupleNAggregator(aggregators);
+ }
+
+ /**
+ * Wrap a {@link CombineFn} adapter around the given aggregator.
+ *
+ * @param aggregator The instance to wrap
+ * @return A {@link CombineFn} delegating to {@code aggregator}
+ */
+ public static final <K, V> CombineFn<K, V> toCombineFn(Aggregator<V> aggregator) {
+ return new AggregatorCombineFn<K, V>(aggregator);
+ }
+
+ /**
+ * Base class for aggregators that do not require any initialization.
+ */
+ public static abstract class SimpleAggregator<T> implements Aggregator<T> {
+ @Override
+ public void initialize(Configuration conf) {
+ // No-op
+ }
+ }
+
+ /**
+ * A {@code CombineFn} that delegates all of the actual work to an
+ * {@code Aggregator} instance.
+ */
+ private static class AggregatorCombineFn<K, V> extends CombineFn<K, V> {
+ // TODO: Has to be fully qualified until CombineFn.Aggregator can be removed.
+ private final org.apache.crunch.Aggregator<V> aggregator;
+
+ public AggregatorCombineFn(org.apache.crunch.Aggregator<V> aggregator) {
+ this.aggregator = aggregator;
+ }
+
+ @Override
+ public void initialize() {
+ aggregator.initialize(getConfiguration());
+ }
+
+ @Override
+ public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) {
+ aggregator.reset();
+ for (V v : input.second()) {
+ aggregator.update(v);
+ }
+ for (V v : aggregator.results()) {
+ emitter.emit(Pair.of(input.first(), v));
+ }
+ }
+ }
+
+ private static class SumLongs extends SimpleAggregator<Long> {
+ private long sum = 0;
+
+ @Override
+ public void reset() {
+ sum = 0;
+ }
+
+ @Override
+ public void update(Long next) {
+ sum += next;
+ }
+
+ @Override
+ public Iterable<Long> results() {
+ return ImmutableList.of(sum);
+ }
+ }
+
+ private static class SumInts extends SimpleAggregator<Integer> {
+ private int sum = 0;
+
+ @Override
+ public void reset() {
+ sum = 0;
+ }
+
+ @Override
+ public void update(Integer next) {
+ sum += next;
+ }
+
+ @Override
+ public Iterable<Integer> results() {
+ return ImmutableList.of(sum);
+ }
+ }
+
+ private static class SumFloats extends SimpleAggregator<Float> {
+ private float sum = 0;
+
+ @Override
+ public void reset() {
+ sum = 0f;
+ }
+
+ @Override
+ public void update(Float next) {
+ sum += next;
+ }
+
+ @Override
+ public Iterable<Float> results() {
+ return ImmutableList.of(sum);
+ }
+ }
+
+ private static class SumDoubles extends SimpleAggregator<Double> {
+ private double sum = 0;
+
+ @Override
+ public void reset() {
+ sum = 0f;
+ }
+
+ @Override
+ public void update(Double next) {
+ sum += next;
+ }
+
+ @Override
+ public Iterable<Double> results() {
+ return ImmutableList.of(sum);
+ }
+ }
+
+ private static class SumBigInts extends SimpleAggregator<BigInteger> {
+ private BigInteger sum = BigInteger.ZERO;
+
+ @Override
+ public void reset() {
+ sum = BigInteger.ZERO;
+ }
+
+ @Override
+ public void update(BigInteger next) {
+ sum = sum.add(next);
+ }
+
+ @Override
+ public Iterable<BigInteger> results() {
+ return ImmutableList.of(sum);
+ }
+ }
+
+ private static class MaxLongs extends SimpleAggregator<Long> {
+ private Long max = null;
+
+ @Override
+ public void reset() {
+ max = null;
+ }
+
+ @Override
+ public void update(Long next) {
+ if (max == null || max < next) {
+ max = next;
+ }
+ }
+
+ @Override
+ public Iterable<Long> results() {
+ return ImmutableList.of(max);
+ }
+ }
+
+ private static class MaxInts extends SimpleAggregator<Integer> {
+ private Integer max = null;
+
+ @Override
+ public void reset() {
+ max = null;
+ }
+
+ @Override
+ public void update(Integer next) {
+ if (max == null || max < next) {
+ max = next;
+ }
+ }
+
+ @Override
+ public Iterable<Integer> results() {
+ return ImmutableList.of(max);
+ }
+ }
+
+ private static class MaxFloats extends SimpleAggregator<Float> {
+ private Float max = null;
+
+ @Override
+ public void reset() {
+ max = null;
+ }
+
+ @Override
+ public void update(Float next) {
+ if (max == null || max < next) {
+ max = next;
+ }
+ }
+
+ @Override
+ public Iterable<Float> results() {
+ return ImmutableList.of(max);
+ }
+ }
+
+ private static class MaxDoubles extends SimpleAggregator<Double> {
+ private Double max = null;
+
+ @Override
+ public void reset() {
+ max = null;
+ }
+
+ @Override
+ public void update(Double next) {
+ if (max == null || max < next) {
+ max = next;
+ }
+ }
+
+ @Override
+ public Iterable<Double> results() {
+ return ImmutableList.of(max);
+ }
+ }
+
+ private static class MaxBigInts extends SimpleAggregator<BigInteger> {
+ private BigInteger max = null;
+
+ @Override
+ public void reset() {
+ max = null;
+ }
+
+ @Override
+ public void update(BigInteger next) {
+ if (max == null || max.compareTo(next) < 0) {
+ max = next;
+ }
+ }
+
+ @Override
+ public Iterable<BigInteger> results() {
+ return ImmutableList.of(max);
+ }
+ }
+
+ private static class MinLongs extends SimpleAggregator<Long> {
+ private Long min = null;
+
+ @Override
+ public void reset() {
+ min = null;
+ }
+
+ @Override
+ public void update(Long next) {
+ if (min == null || min > next) {
+ min = next;
+ }
+ }
+
+ @Override
+ public Iterable<Long> results() {
+ return ImmutableList.of(min);
+ }
+ }
+
+ private static class MinInts extends SimpleAggregator<Integer> {
+ private Integer min = null;
+
+ @Override
+ public void reset() {
+ min = null;
+ }
+
+ @Override
+ public void update(Integer next) {
+ if (min == null || min > next) {
+ min = next;
+ }
+ }
+
+ @Override
+ public Iterable<Integer> results() {
+ return ImmutableList.of(min);
+ }
+ }
+
+ private static class MinFloats extends SimpleAggregator<Float> {
+ private Float min = null;
+
+ @Override
+ public void reset() {
+ min = null;
+ }
+
+ @Override
+ public void update(Float next) {
+ if (min == null || min > next) {
+ min = next;
+ }
+ }
+
+ @Override
+ public Iterable<Float> results() {
+ return ImmutableList.of(min);
+ }
+ }
+
+ private static class MinDoubles extends SimpleAggregator<Double> {
+ private Double min = null;
+
+ @Override
+ public void reset() {
+ min = null;
+ }
+
+ @Override
+ public void update(Double next) {
+ if (min == null || min > next) {
+ min = next;
+ }
+ }
+
+ @Override
+ public Iterable<Double> results() {
+ return ImmutableList.of(min);
+ }
+ }
+
+ private static class MinBigInts extends SimpleAggregator<BigInteger> {
+ private BigInteger min = null;
+
+ @Override
+ public void reset() {
+ min = null;
+ }
+
+ @Override
+ public void update(BigInteger next) {
+ if (min == null || min.compareTo(next) > 0) {
+ min = next;
+ }
+ }
+
+ @Override
+ public Iterable<BigInteger> results() {
+ return ImmutableList.of(min);
+ }
+ }
+
+ private static class MaxNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
+ private final int arity;
+ private transient SortedSet<V> elements;
+
+ public MaxNAggregator(int arity) {
+ this.arity = arity;
+ }
+
+ @Override
+ public void reset() {
+ if (elements == null) {
+ elements = Sets.newTreeSet();
+ } else {
+ elements.clear();
+ }
+ }
+
+ @Override
+ public void update(V value) {
+ if (elements.size() < arity) {
+ elements.add(value);
+ } else if (value.compareTo(elements.first()) > 0) {
+ elements.remove(elements.first());
+ elements.add(value);
+ }
+ }
+
+ @Override
+ public Iterable<V> results() {
+ return ImmutableList.copyOf(elements);
+ }
+ }
+
+ private static class MinNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
+ private final int arity;
+ private transient SortedSet<V> elements;
+
+ public MinNAggregator(int arity) {
+ this.arity = arity;
+ }
+
+ @Override
+ public void reset() {
+ if (elements == null) {
+ elements = Sets.newTreeSet();
+ } else {
+ elements.clear();
+ }
+ }
+
+ @Override
+ public void update(V value) {
+ if (elements.size() < arity) {
+ elements.add(value);
+ } else if (value.compareTo(elements.last()) < 0) {
+ elements.remove(elements.last());
+ elements.add(value);
+ }
+ }
+
+ @Override
+ public Iterable<V> results() {
+ return ImmutableList.copyOf(elements);
+ }
+ }
+
+ private static class FirstNAggregator<V> extends SimpleAggregator<V> {
+ private final int arity;
+ private final List<V> elements;
+
+ public FirstNAggregator(int arity) {
+ this.arity = arity;
+ this.elements = Lists.newArrayList();
+ }
+
+ @Override
+ public void reset() {
+ elements.clear();
+ }
+
+ @Override
+ public void update(V value) {
+ if (elements.size() < arity) {
+ elements.add(value);
+ }
+ }
+
+ @Override
+ public Iterable<V> results() {
+ return ImmutableList.copyOf(elements);
+ }
+ }
+
+ private static class LastNAggregator<V> extends SimpleAggregator<V> {
+ private final int arity;
+ private final LinkedList<V> elements;
+
+ public LastNAggregator(int arity) {
+ this.arity = arity;
+ this.elements = Lists.newLinkedList();
+ }
+
+ @Override
+ public void reset() {
+ elements.clear();
+ }
+
+ @Override
+ public void update(V value) {
+ elements.add(value);
+ if (elements.size() == arity + 1) {
+ elements.removeFirst();
+ }
+ }
+
+ @Override
+ public Iterable<V> results() {
+ return ImmutableList.copyOf(elements);
+ }
+ }
+
+ private static class StringConcatAggregator extends SimpleAggregator<String> {
+ private final String separator;
+ private final boolean skipNulls;
+ private final long maxOutputLength;
+ private final long maxInputLength;
+ private long currentLength;
+ private final LinkedList<String> list = new LinkedList<String>();
+
+ private transient Joiner joiner;
+
+ public StringConcatAggregator(final String separator, final boolean skipNulls) {
+ this.separator = separator;
+ this.skipNulls = skipNulls;
+ this.maxInputLength = 0;
+ this.maxOutputLength = 0;
+ }
+
+ public StringConcatAggregator(final String separator, final boolean skipNull, final long maxOutputLength, final long maxInputLength) {
+ this.separator = separator;
+ this.skipNulls = skipNull;
+ this.maxOutputLength = maxOutputLength;
+ this.maxInputLength = maxInputLength;
+ this.currentLength = -separator.length();
+ }
+
+ @Override
+ public void reset() {
+ if (joiner == null) {
+ joiner = skipNulls ? Joiner.on(separator).skipNulls() : Joiner.on(separator);
+ }
+ currentLength = -separator.length();
+ list.clear();
+ }
+
+ @Override
+ public void update(final String next) {
+ long length = (next == null) ? 0 : next.length() + separator.length();
+ if (maxOutputLength > 0 && currentLength + length > maxOutputLength || maxInputLength > 0 && next.length() > maxInputLength) {
+ return;
+ }
+ if (maxOutputLength > 0) {
+ currentLength += length;
+ }
+ list.add(next);
+ }
+
+ @Override
+ public Iterable<String> results() {
+ return ImmutableList.of(joiner.join(list));
+ }
+ }
+
+
+ private static abstract class TupleAggregator<T> implements Aggregator<T> {
+ private final List<Aggregator<Object>> aggregators;
+
+ @SuppressWarnings("unchecked")
+ public TupleAggregator(Aggregator<?>... aggregators) {
+ this.aggregators = Lists.newArrayList();
+ for (Aggregator<?> a : aggregators) {
+ this.aggregators.add((Aggregator<Object>) a);
+ }
+ }
+
+ @Override
+ public void initialize(Configuration configuration) {
+ for (Aggregator<?> a : aggregators) {
+ a.initialize(configuration);
+ }
+ }
+
+ @Override
+ public void reset() {
+ for (Aggregator<?> a : aggregators) {
+ a.reset();
+ }
+ }
+
+ protected void updateTuple(Tuple t) {
+ for (int i = 0; i < aggregators.size(); i++) {
+ aggregators.get(i).update(t.get(i));
+ }
+ }
+
+ protected Iterable<Object> results(int index) {
+ return aggregators.get(index).results();
+ }
+ }
+
+ private static class PairAggregator<V1, V2> extends TupleAggregator<Pair<V1, V2>> {
+
+ public PairAggregator(Aggregator<V1> a1, Aggregator<V2> a2) {
+ super(a1, a2);
+ }
+
+ @Override
+ public void update(Pair<V1, V2> value) {
+ updateTuple(value);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Iterable<Pair<V1, V2>> results() {
+ return new Tuples.PairIterable<V1, V2>((Iterable<V1>) results(0), (Iterable<V2>) results(1));
+ }
+ }
+
+ private static class TripAggregator<A, B, C> extends TupleAggregator<Tuple3<A, B, C>> {
+
+ public TripAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3) {
+ super(a1, a2, a3);
+ }
+
+ @Override
+ public void update(Tuple3<A, B, C> value) {
+ updateTuple(value);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Iterable<Tuple3<A, B, C>> results() {
+ return new Tuples.TripIterable<A, B, C>((Iterable<A>) results(0), (Iterable<B>) results(1),
+ (Iterable<C>) results(2));
+ }
+ }
+
+ private static class QuadAggregator<A, B, C, D> extends TupleAggregator<Tuple4<A, B, C, D>> {
+
+ public QuadAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3, Aggregator<D> a4) {
+ super(a1, a2, a3, a4);
+ }
+
+ @Override
+ public void update(Tuple4<A, B, C, D> value) {
+ updateTuple(value);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Iterable<Tuple4<A, B, C, D>> results() {
+ return new Tuples.QuadIterable<A, B, C, D>((Iterable<A>) results(0), (Iterable<B>) results(1),
+ (Iterable<C>) results(2), (Iterable<D>) results(3));
+ }
+ }
+
+ private static class TupleNAggregator extends TupleAggregator<TupleN> {
+ private final int size;
+
+ public TupleNAggregator(Aggregator<?>... aggregators) {
+ super(aggregators);
+ size = aggregators.length;
+ }
+
+ @Override
+ public void update(TupleN value) {
+ updateTuple(value);
+ }
+
+ @Override
+ public Iterable<TupleN> results() {
+ Iterable<?>[] iterables = new Iterable[size];
+ for (int i = 0; i < size; i++) {
+ iterables[i] = results(i);
+ }
+ return new Tuples.TupleNIterable(iterables);
+ }
+ }
+
+ private static class SetAggregator<V> extends SimpleAggregator<V> {
+ private final Set<V> elements;
+ private final int sizeLimit;
+
+ public SetAggregator() {
+ this(-1);
+ }
+
+ public SetAggregator(int sizeLimit) {
+ this.elements = Sets.newHashSet();
+ this.sizeLimit = sizeLimit;
+ }
+
+ @Override
+ public void reset() {
+ elements.clear();
+ }
+
+ @Override
+ public void update(V value) {
+ if (sizeLimit == -1 || elements.size() < sizeLimit) {
+ elements.add(value);
+ }
+ }
+
+ @Override
+ public Iterable<V> results() {
+ return ImmutableList.copyOf(elements);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/CompositeMapFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
new file mode 100644
index 0000000..2a8e7d9
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+public class CompositeMapFn<R, S, T> extends MapFn<R, T> {
+
+ private final MapFn<R, S> first;
+ private final MapFn<S, T> second;
+
+ public CompositeMapFn(MapFn<R, S> first, MapFn<S, T> second) {
+ this.first = first;
+ this.second = second;
+ }
+
+ @Override
+ public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+ first.setContext(context);
+ second.setContext(context);
+ }
+
+ @Override
+ public void initialize() {
+ first.initialize();
+ second.initialize();
+ }
+
+ public MapFn<R, S> getFirst() {
+ return first;
+ }
+
+ public MapFn<S, T> getSecond() {
+ return second;
+ }
+
+ @Override
+ public T map(R input) {
+ return second.map(first.map(input));
+ }
+
+ @Override
+ public void cleanup(Emitter<T> emitter) {
+ first.cleanup(null);
+ second.cleanup(null);
+ }
+
+ @Override
+ public void configure(Configuration conf) {
+ first.configure(conf);
+ second.configure(conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
new file mode 100644
index 0000000..b8cc9df
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * Wrapper function for converting a {@code MapFn} into a key-value pair that is
+ * used to convert from a {@code PCollection<V>} to a {@code PTable<K, V>}.
+ */
+public class ExtractKeyFn<K, V> extends MapFn<V, Pair<K, V>> {
+
+ private final MapFn<V, K> mapFn;
+
+ public ExtractKeyFn(MapFn<V, K> mapFn) {
+ this.mapFn = mapFn;
+ }
+
+ @Override
+ public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+ mapFn.setContext(context);
+ }
+
+ @Override
+ public void initialize() {
+ mapFn.initialize();
+ }
+
+ @Override
+ public Pair<K, V> map(V input) {
+ return Pair.of(mapFn.map(input), input);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/fn/FilterFns.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/FilterFns.java b/crunch-core/src/main/java/org/apache/crunch/fn/FilterFns.java
new file mode 100644
index 0000000..8dc4268
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/FilterFns.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.FilterFn;
+import org.apache.crunch.FilterFn.AndFn;
+import org.apache.crunch.FilterFn.NotFn;
+import org.apache.crunch.FilterFn.OrFn;
+
+
+/**
+ * A collection of pre-defined {@link FilterFn} implementations.
+ */
+public final class FilterFns {
+ // Note: We delegate to the deprecated implementation classes in FilterFn. When their
+ // time is up, we just move them here.
+
+ private FilterFns() {
+ // utility class, not for instantiation
+ }
+
+ /**
+ * Accept an entry if all of the given filters accept it, using short-circuit evaluation.
+ * @param fn1 The first functions to delegate to
+ * @param fn2 The second functions to delegate to
+ * @return The composed filter function
+ */
+ public static <S> FilterFn<S> and(FilterFn<S> fn1, FilterFn<S> fn2) {
+ return new AndFn<S>(fn1, fn2);
+ }
+
+ /**
+ * Accept an entry if all of the given filters accept it, using short-circuit evaluation.
+ * @param fns The functions to delegate to (in the given order)
+ * @return The composed filter function
+ */
+ public static <S> FilterFn<S> and(FilterFn<S>... fns) {
+ return new AndFn<S>(fns);
+ }
+
+ /**
+ * Accept an entry if at least one of the given filters accept it, using short-circuit evaluation.
+ * @param fn1 The first functions to delegate to
+ * @param fn2 The second functions to delegate to
+ * @return The composed filter function
+ */
+ public static <S> FilterFn<S> or(FilterFn<S> fn1, FilterFn<S> fn2) {
+ return new OrFn<S>(fn1, fn2);
+ }
+
+ /**
+ * Accept an entry if at least one of the given filters accept it, using short-circuit evaluation.
+ * @param fns The functions to delegate to (in the given order)
+ * @return The composed filter function
+ */
+ public static <S> FilterFn<S> or(FilterFn<S>... fns) {
+ return new OrFn<S>(fns);
+ }
+
+ /**
+ * Accept an entry if the given filter <em>does not</em> accept it.
+ * @param fn The function to delegate to
+ * @return The composed filter function
+ */
+ public static <S> FilterFn<S> not(FilterFn<S> fn) {
+ return new NotFn<S>(fn);
+ }
+
+ /**
+ * Accept everything.
+ * @return A filter function that accepts everything.
+ */
+ public static <S> FilterFn<S> ACCEPT_ALL() {
+ return new AcceptAllFn<S>();
+ }
+
+ /**
+ * Reject everything.
+ * @return A filter function that rejects everything.
+ */
+ public static <S> FilterFn<S> REJECT_ALL() {
+ return not(new AcceptAllFn<S>());
+ }
+
+ private static class AcceptAllFn<S> extends FilterFn<S> {
+ @Override
+ public boolean accept(S input) {
+ return true;
+ }
+
+ @Override
+ public float scaleFactor() {
+ return 1.0f;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/fn/IdentityFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/IdentityFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/IdentityFn.java
new file mode 100644
index 0000000..0eadb06
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/IdentityFn.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.MapFn;
+
+public class IdentityFn<T> extends MapFn<T, T> {
+
+ private static final IdentityFn<Object> INSTANCE = new IdentityFn<Object>();
+
+ @SuppressWarnings("unchecked")
+ public static <T> IdentityFn<T> getInstance() {
+ return (IdentityFn<T>) INSTANCE;
+ }
+
+ // Non-instantiable
+ private IdentityFn() {
+ }
+
+ @Override
+ public T map(T input) {
+ return input;
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java
new file mode 100644
index 0000000..cbaf24d
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+
+public abstract class MapKeysFn<K1, K2, V> extends DoFn<Pair<K1, V>, Pair<K2, V>> {
+
+ @Override
+ public void process(Pair<K1, V> input, Emitter<Pair<K2, V>> emitter) {
+ emitter.emit(Pair.of(map(input.first()), input.second()));
+ }
+
+ public abstract K2 map(K1 k1);
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java
new file mode 100644
index 0000000..b90f5ff
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+
+public abstract class MapValuesFn<K, V1, V2> extends DoFn<Pair<K, V1>, Pair<K, V2>> {
+
+ @Override
+ public void process(Pair<K, V1> input, Emitter<Pair<K, V2>> emitter) {
+ emitter.emit(Pair.of(input.first(), map(input.second())));
+ }
+
+ public abstract V2 map(V1 v);
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/fn/PairMapFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/PairMapFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/PairMapFn.java
new file mode 100644
index 0000000..9ee4336
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/PairMapFn.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+public class PairMapFn<K, V, S, T> extends MapFn<Pair<K, V>, Pair<S, T>> {
+
+ private MapFn<K, S> keys;
+ private MapFn<V, T> values;
+
+ public PairMapFn(MapFn<K, S> keys, MapFn<V, T> values) {
+ this.keys = keys;
+ this.values = values;
+ }
+
+ @Override
+ public void configure(Configuration conf) {
+ keys.configure(conf);
+ values.configure(conf);
+ }
+
+ @Override
+ public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+ keys.setContext(context);
+ values.setContext(context);
+ }
+
+ @Override
+ public void initialize() {
+ keys.initialize();
+ values.initialize();
+ }
+
+ @Override
+ public Pair<S, T> map(Pair<K, V> input) {
+ return Pair.of(keys.map(input.first()), values.map(input.second()));
+ }
+
+ @Override
+ public void cleanup(Emitter<Pair<S, T>> emitter) {
+ keys.cleanup(null);
+ values.cleanup(null);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/fn/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/package-info.java b/crunch-core/src/main/java/org/apache/crunch/fn/package-info.java
new file mode 100644
index 0000000..acefdff
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Commonly used functions for manipulating collections.
+ */
+package org.apache.crunch.fn;
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java
new file mode 100644
index 0000000..887c051
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.hadoop.mapreduce;
+
+import java.lang.reflect.Constructor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * A factory class that allows us to hide the fact that {@code TaskAttemptContext} is a class in
+ * Hadoop 1.x.x and an interface in Hadoop 2.x.x.
+ */
+@SuppressWarnings("unchecked")
+public class TaskAttemptContextFactory {
+
+ private static final Log LOG = LogFactory.getLog(TaskAttemptContextFactory.class);
+
+ private static final TaskAttemptContextFactory INSTANCE = new TaskAttemptContextFactory();
+
+ public static TaskAttemptContext create(Configuration conf, TaskAttemptID taskAttemptId) {
+ return INSTANCE.createInternal(conf, taskAttemptId);
+ }
+
+ private Constructor<TaskAttemptContext> taskAttemptConstructor;
+
+ private TaskAttemptContextFactory() {
+ Class<TaskAttemptContext> implClass = TaskAttemptContext.class;
+ if (implClass.isInterface()) {
+ try {
+ implClass = (Class<TaskAttemptContext>) Class.forName(
+ "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
+ } catch (ClassNotFoundException e) {
+ LOG.fatal("Could not find TaskAttemptContextImpl class, exiting", e);
+ }
+ }
+ try {
+ this.taskAttemptConstructor = implClass.getConstructor(Configuration.class, TaskAttemptID.class);
+ } catch (Exception e) {
+ LOG.fatal("Could not access TaskAttemptContext constructor, exiting", e);
+ }
+ }
+
+ private TaskAttemptContext createInternal(Configuration conf, TaskAttemptID taskAttemptId) {
+ try {
+ return (TaskAttemptContext) taskAttemptConstructor.newInstance(conf, taskAttemptId);
+ } catch (Exception e) {
+ LOG.error("Could not construct a TaskAttemptContext instance", e);
+ return null;
+ }
+ }
+}