You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:37 UTC

[35/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
new file mode 100644
index 0000000..84c720c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Manages the state of a pipeline execution.
+ * 
+ */
+public interface Pipeline {
+
+  /**
+   * Set the {@code Configuration} to use with this pipeline.
+   */
+  void setConfiguration(Configuration conf);
+
+  /**
+   * Returns the name of this pipeline.
+   * 
+   * @return Name of the pipeline
+   */
+  String getName();
+
+  /**
+   * Returns the {@code Configuration} instance associated with this pipeline.
+   */
+  Configuration getConfiguration();
+
+  /**
+   * Converts the given {@code Source} into a {@code PCollection} that is
+   * available to jobs run using this {@code Pipeline} instance.
+   * 
+   * @param source
+   *          The source of data
+   * @return A PCollection that references the given source
+   */
+  <T> PCollection<T> read(Source<T> source);
+
+  /**
+   * A version of the read method for {@code TableSource} instances that map to
+   * {@code PTable}s.
+   * 
+   * @param tableSource
+   *          The source of the data
+   * @return A PTable that references the given source
+   */
+  <K, V> PTable<K, V> read(TableSource<K, V> tableSource);
+
+  /**
+   * Write the given collection to the given target on the next pipeline run. The
+   * system will check to see if the target's location already exists using the
+   * {@code WriteMode.DEFAULT} rule for the given {@code Target}.
+   * 
+   * @param collection
+   *          The collection
+   * @param target
+   *          The output target
+   */
+  void write(PCollection<?> collection, Target target);
+
+  /**
+  * Write the contents of the {@code PCollection} to the given {@code Target},
+  * using the storage format specified by the target and the given
+  * {@code WriteMode} for cases where the referenced {@code Target}
+  * already exists.
+  *
+  * @param collection
+  *          The collection
+  * @param target
+  *          The target to write to
+  * @param writeMode
+  *          The strategy to use for handling existing outputs
+  */
+ void write(PCollection<?> collection, Target target,
+     Target.WriteMode writeMode);
+
+ /**
+   * Create the given PCollection and read the data it contains into the
+   * returned Collection instance for client use.
+   * 
+   * @param pcollection
+   *          The PCollection to materialize
+   * @return the data from the PCollection as a read-only Collection
+   */
+  <T> Iterable<T> materialize(PCollection<T> pcollection);
+
+  /**
+   * Constructs and executes a series of MapReduce jobs in order to write data
+   * to the output targets.
+   */
+  PipelineResult run();
+
+  /**
+   * Constructs and starts a series of MapReduce jobs in order ot write data to
+   * the output targets, but returns a {@code ListenableFuture} to allow clients to control
+   * job execution.
+   * @return
+   */
+  PipelineExecution runAsync();
+  
+  /**
+   * Run any remaining jobs required to generate outputs and then clean up any
+   * intermediate data files that were created in this run or previous calls to
+   * {@code run}.
+   */
+  PipelineResult done();
+
+  /**
+   * A convenience method for reading a text file.
+   */
+  PCollection<String> readTextFile(String pathName);
+
+  /**
+   * A convenience method for writing a text file.
+   */
+  <T> void writeTextFile(PCollection<T> collection, String pathName);
+
+  /**
+   * Turn on debug logging for jobs that are run from this pipeline.
+   */
+  void enableDebug();
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java b/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java
new file mode 100644
index 0000000..fc6bb91
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A handle to allow clients to control a Crunch pipeline as it runs.
+ *
+ * This interface is thread-safe.
+ */
+public interface PipelineExecution {
+
+  enum Status { READY, RUNNING, SUCCEEDED, FAILED, KILLED }
+
+  /** Returns the .dot file that allows a client to graph the Crunch execution plan for this
+   * pipeline.
+   */
+  String getPlanDotFile();
+
+  /** Blocks until pipeline completes or the specified waiting time elapsed. */
+   void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException;
+
+   /** Blocks until pipeline completes, i.e. {@code SUCCEEDED}, {@code FAILED} or {@code KILLED}. */
+  void waitUntilDone() throws InterruptedException;
+
+  Status getStatus();
+
+  /** Retrieve the result of a pipeline if it has been completed, otherwise {@code null}. */
+  PipelineResult getResult();
+
+  /**
+   * Kills the pipeline if it is running, no-op otherwise.
+   *
+   * This method only delivers a kill signal to the pipeline, and does not guarantee the pipeline exits on return.
+   * To wait for completely exits, use {@link #waitUntilDone()} after this call.
+   */
+  void kill() throws InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
new file mode 100644
index 0000000..90b1067
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Container for the results of a call to {@code run} or {@code done} on the
+ * Pipeline interface that includes details and statistics about the component
+ * stages of the data pipeline.
+ */
+public class PipelineResult {
+
+  public static class StageResult {
+
+    private final String stageName;
+    private final Counters counters;
+
+    public StageResult(String stageName, Counters counters) {
+      this.stageName = stageName;
+      this.counters = counters;
+    }
+
+    public String getStageName() {
+      return stageName;
+    }
+
+    public Counters getCounters() {
+      return counters;
+    }
+
+    public Counter findCounter(Enum<?> key) {
+      return counters.findCounter(key);
+    }
+
+    public long getCounterValue(Enum<?> key) {
+      return findCounter(key).getValue();
+    }
+  }
+
+  public static final PipelineResult EMPTY = new PipelineResult(ImmutableList.<StageResult> of());
+
+  private final List<StageResult> stageResults;
+
+  public PipelineResult(List<StageResult> stageResults) {
+    this.stageResults = ImmutableList.copyOf(stageResults);
+  }
+
+  public boolean succeeded() {
+    return !stageResults.isEmpty();
+  }
+
+  public List<StageResult> getStageResults() {
+    return stageResults;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/Source.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Source.java b/crunch-core/src/main/java/org/apache/crunch/Source.java
new file mode 100644
index 0000000..f54d135
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/Source.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.io.IOException;
+
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * A {@code Source} represents an input data set that is an input to one or more
+ * MapReduce jobs.
+ * 
+ */
+public interface Source<T> {
+  /**
+   * Returns the {@code PType} for this source.
+   */
+  PType<T> getType();
+
+  /**
+   * Configure the given job to use this source as an input.
+   * 
+   * @param job
+   *          The job to configure
+   * @param inputId
+   *          For a multi-input job, an identifier for this input to the job
+   * @throws IOException
+   */
+  void configureSource(Job job, int inputId) throws IOException;
+
+  /**
+   * Returns the number of bytes in this {@code Source}.
+   */
+  long getSize(Configuration configuration);
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java
new file mode 100644
index 0000000..09c03c6
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+/**
+ * An interface for classes that implement both the {@code Source} and the
+ * {@code Target} interfaces.
+ *
+ */
+public interface SourceTarget<T> extends Source<T>, Target {
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/TableSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/TableSource.java b/crunch-core/src/main/java/org/apache/crunch/TableSource.java
new file mode 100644
index 0000000..ff27346
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/TableSource.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.crunch.types.PTableType;
+
+/**
+ * The interface {@code Source} implementations that return a {@link PTable}.
+ * 
+ */
+public interface TableSource<K, V> extends Source<Pair<K, V>> {
+  PTableType<K, V> getTableType();
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/TableSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/TableSourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/TableSourceTarget.java
new file mode 100644
index 0000000..9b1ed34
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/TableSourceTarget.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+/**
+ * An interface for classes that implement both the {@code TableSource} and the
+ * {@code Target} interfaces.
+ */
+public interface TableSourceTarget<K, V> extends TableSource<K, V>, SourceTarget<Pair<K, V>> {
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/Target.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Target.java b/crunch-core/src/main/java/org/apache/crunch/Target.java
new file mode 100644
index 0000000..0a0c23d
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/Target.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A {@code Target} represents the output destination of a Crunch {@code PCollection}
+ * in the context of a Crunch job.
+ */
+public interface Target {
+
+  /**
+   * An enum to represent different options the client may specify
+   * for handling the case where the output path, table, etc. referenced
+   * by a {@code Target} already exists.
+   */
+  enum WriteMode {
+    /**
+     * Check to see if the output target already exists before running
+     * the pipeline, and if it does, print an error and throw an exception.
+     */
+    DEFAULT,
+    
+    /**
+     * Check to see if the output target already exists, and if it does,
+     * delete it and overwrite it with the new output (if any).
+     */
+    OVERWRITE,
+
+    /**
+     * If the output target does not exist, create it. If it does exist,
+     * add the output of this pipeline to the target. This was the
+     * behavior in Crunch up to version 0.4.0.
+     */
+    APPEND
+  }
+
+  /**
+   * Apply the given {@code WriteMode} to this {@code Target} instance.
+   * 
+   * @param writeMode The strategy for handling existing outputs
+   * @param conf The ever-useful {@code Configuration} instance
+   */
+  void handleExisting(WriteMode writeMode, Configuration conf);
+  
+  /**
+   * Checks to see if this {@code Target} instance is compatible with the
+   * given {@code PType}.
+   * 
+   * @param handler The {@link OutputHandler} that is managing the output for the job
+   * @param ptype The {@code PType} to check
+   * @return True if this Target can write data in the form of the given {@code PType},
+   * false otherwise
+   */
+  boolean accept(OutputHandler handler, PType<?> ptype);
+
+  /**
+   * Attempt to create the {@code SourceTarget} type that corresponds to this {@code Target}
+   * for the given {@code PType}, if possible. If it is not possible, return {@code null}.
+   * 
+   * @param ptype The {@code PType} to use in constructing the {@code SourceTarget}
+   * @return A new {@code SourceTarget} or null if such a {@code SourceTarget} does not exist
+   */
+  <T> SourceTarget<T> asSourceTarget(PType<T> ptype);
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/Tuple.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Tuple.java b/crunch-core/src/main/java/org/apache/crunch/Tuple.java
new file mode 100644
index 0000000..4e602ff
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/Tuple.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+/**
+ * A fixed-size collection of Objects, used in Crunch for representing joins
+ * between {@code PCollection}s.
+ * 
+ */
+public interface Tuple {
+
+  /**
+   * Returns the Object at the given index.
+   */
+  Object get(int index);
+
+  /**
+   * Returns the number of elements in this Tuple.
+   */
+  int size();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/Tuple3.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Tuple3.java b/crunch-core/src/main/java/org/apache/crunch/Tuple3.java
new file mode 100644
index 0000000..4372811
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/Tuple3.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+/**
+ * A convenience class for three-element {@link Tuple}s.
+ */
+public class Tuple3<V1, V2, V3> implements Tuple {
+
+  private final V1 first;
+  private final V2 second;
+  private final V3 third;
+
+  public static <A, B, C> Tuple3<A, B, C> of(A a, B b, C c) {
+    return new Tuple3<A, B, C>(a, b, c);
+  }
+
+  public Tuple3(V1 first, V2 second, V3 third) {
+    this.first = first;
+    this.second = second;
+    this.third = third;
+  }
+
+  public V1 first() {
+    return first;
+  }
+
+  public V2 second() {
+    return second;
+  }
+
+  public V3 third() {
+    return third;
+  }
+
+  public Object get(int index) {
+    switch (index) {
+    case 0:
+      return first;
+    case 1:
+      return second;
+    case 2:
+      return third;
+    default:
+      throw new ArrayIndexOutOfBoundsException();
+    }
+  }
+
+  public int size() {
+    return 3;
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    return hcb.append(first).append(second).append(third).toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    Tuple3<?, ?, ?> other = (Tuple3<?, ?, ?>) obj;
+    return (first == other.first || (first != null && first.equals(other.first)))
+        && (second == other.second || (second != null && second.equals(other.second)))
+        && (third == other.third || (third != null && third.equals(other.third)));
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("Tuple3[");
+    sb.append(first).append(",").append(second).append(",").append(third);
+    return sb.append("]").toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/Tuple4.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Tuple4.java b/crunch-core/src/main/java/org/apache/crunch/Tuple4.java
new file mode 100644
index 0000000..f161371
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/Tuple4.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+/**
+ * A convenience class for four-element {@link Tuple}s.
+ */
+public class Tuple4<V1, V2, V3, V4> implements Tuple {
+
+  private final V1 first;
+  private final V2 second;
+  private final V3 third;
+  private final V4 fourth;
+
+  public static <A, B, C, D> Tuple4<A, B, C, D> of(A a, B b, C c, D d) {
+    return new Tuple4<A, B, C, D>(a, b, c, d);
+  }
+
+  public Tuple4(V1 first, V2 second, V3 third, V4 fourth) {
+    this.first = first;
+    this.second = second;
+    this.third = third;
+    this.fourth = fourth;
+  }
+
+  public V1 first() {
+    return first;
+  }
+
+  public V2 second() {
+    return second;
+  }
+
+  public V3 third() {
+    return third;
+  }
+
+  public V4 fourth() {
+    return fourth;
+  }
+
+  public Object get(int index) {
+    switch (index) {
+    case 0:
+      return first;
+    case 1:
+      return second;
+    case 2:
+      return third;
+    case 3:
+      return fourth;
+    default:
+      throw new ArrayIndexOutOfBoundsException();
+    }
+  }
+
+  public int size() {
+    return 4;
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    return hcb.append(first).append(second).append(third).append(fourth).toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    Tuple4<?, ?, ?, ?> other = (Tuple4<?, ?, ?, ?>) obj;
+    return (first == other.first || (first != null && first.equals(other.first)))
+        && (second == other.second || (second != null && second.equals(other.second)))
+        && (third == other.third || (third != null && third.equals(other.third)))
+        && (fourth == other.fourth || (fourth != null && fourth.equals(other.fourth)));
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("Tuple4[");
+    sb.append(first).append(",").append(second).append(",").append(third);
+    return sb.append(",").append(fourth).append("]").toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/TupleN.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/TupleN.java b/crunch-core/src/main/java/org/apache/crunch/TupleN.java
new file mode 100644
index 0000000..e5eceb5
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/TupleN.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.util.Arrays;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+/**
+ * A {@link Tuple} instance for an arbitrary number of values.
+ */
+public class TupleN implements Tuple {
+
+  private final Object values[];
+
+  public static TupleN of(Object... values) {
+    return new TupleN(values);
+  }
+
+  public TupleN(Object... values) {
+    this.values = new Object[values.length];
+    System.arraycopy(values, 0, this.values, 0, values.length);
+  }
+
+  public Object get(int index) {
+    return values[index];
+  }
+
+  public int size() {
+    return values.length;
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    for (Object v : values) {
+      hcb.append(v);
+    }
+    return hcb.toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    TupleN other = (TupleN) obj;
+    return Arrays.equals(this.values, other.values);
+  }
+
+  @Override
+  public String toString() {
+    return Arrays.toString(values);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
new file mode 100644
index 0000000..0ac79e2
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
@@ -0,0 +1,1111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import java.math.BigInteger;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+
+import org.apache.crunch.Aggregator;
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.util.Tuples;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+
+/**
+ * A collection of pre-defined {@link org.apache.crunch.Aggregator}s.
+ *
+ * <p>The factory methods of this class return {@link org.apache.crunch.Aggregator}
+ * instances that you can use to combine the values of a {@link PGroupedTable}.
+ * In most cases, they turn a multimap (multiple entries per key) into a map (one
+ * entry per key).</p>
+ *
+ * <p><strong>Note</strong>: When using composed aggregators, like those built by the
+ * {@link #pairAggregator(Aggregator, Aggregator) pairAggregator()}
+ * factory method, you typically don't want to put in the same child aggregator more than once,
+ * even if all child aggregators have the same type. In most cases, this is what you want:</p>
+ *
+ * <pre>
+ *   PTable&lt;K, Long&gt; result = groupedTable.combineValues(
+ *      pairAggregator(SUM_LONGS(), SUM_LONGS())
+ *   );
+ * </pre>
+ */
+public final class Aggregators {
+
+  private Aggregators() {
+    // utility class, not for instantiation
+  }
+
+  /**
+   * Sum up all {@code long} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Long> SUM_LONGS() {
+    return new SumLongs();
+  }
+
+  /**
+   * Sum up all {@code int} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Integer> SUM_INTS() {
+    return new SumInts();
+  }
+
+  /**
+   * Sum up all {@code float} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Float> SUM_FLOATS() {
+    return new SumFloats();
+  }
+
+  /**
+   * Sum up all {@code double} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Double> SUM_DOUBLES() {
+    return new SumDoubles();
+  }
+
+  /**
+   * Sum up all {@link BigInteger} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<BigInteger> SUM_BIGINTS() {
+    return new SumBigInts();
+  }
+
+  /**
+   * Return the maximum of all given {@code long} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Long> MAX_LONGS() {
+    return new MaxLongs();
+  }
+
+  /**
+   * Return the {@code n} largest {@code long} values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Long> MAX_LONGS(int n) {
+    return new MaxLongs();
+  }
+
+  /**
+   * Return the maximum of all given {@code int} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Integer> MAX_INTS() {
+    return new MaxInts();
+  }
+
+  /**
+   * Return the {@code n} largest {@code int} values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Integer> MAX_INTS(int n) {
+    return new MaxNAggregator<Integer>(n);
+  }
+
+  /**
+   * Return the maximum of all given {@code float} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Float> MAX_FLOATS() {
+    return new MaxFloats();
+  }
+
+  /**
+   * Return the {@code n} largest {@code float} values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Float> MAX_FLOATS(int n) {
+    return new MaxNAggregator<Float>(n);
+  }
+
+  /**
+   * Return the maximum of all given {@code double} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Double> MAX_DOUBLES() {
+    return new MaxDoubles();
+  }
+
+  /**
+   * Return the {@code n} largest {@code double} values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Double> MAX_DOUBLES(int n) {
+    return new MaxNAggregator<Double>(n);
+  }
+
+  /**
+   * Return the maximum of all given {@link BigInteger} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<BigInteger> MAX_BIGINTS() {
+    return new MaxBigInts();
+  }
+
+  /**
+   * Return the {@code n} largest {@link BigInteger} values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static Aggregator<BigInteger> MAX_BIGINTS(int n) {
+    return new MaxNAggregator<BigInteger>(n);
+  }
+
+  /**
+   * Return the {@code n} largest values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @param cls The type of the values to aggregate (must implement {@link Comparable}!)
+   * @return The newly constructed instance
+   */
+  public static <V extends Comparable<V>> Aggregator<V> MAX_N(int n, Class<V> cls) {
+    return new MaxNAggregator<V>(n);
+  }
+
+  /**
+   * Return the minimum of all given {@code long} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Long> MIN_LONGS() {
+    return new MinLongs();
+  }
+
+  /**
+   * Return the {@code n} smallest {@code long} values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Long> MIN_LONGS(int n) {
+    return new MinNAggregator<Long>(n);
+  }
+
+  /**
+   * Return the minimum of all given {@code int} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Integer> MIN_INTS() {
+    return new MinInts();
+  }
+
+  /**
+   * Return the {@code n} smallest {@code int} values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Integer> MIN_INTS(int n) {
+    return new MinNAggregator<Integer>(n);
+  }
+
+  /**
+   * Return the minimum of all given {@code float} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Float> MIN_FLOATS() {
+    return new MinFloats();
+  }
+
+  /**
+   * Return the {@code n} smallest {@code float} values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Float> MIN_FLOATS(int n) {
+    return new MinNAggregator<Float>(n);
+  }
+
+  /**
+   * Return the minimum of all given {@code double} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Double> MIN_DOUBLES() {
+    return new MinDoubles();
+  }
+
+  /**
+   * Return the {@code n} smallest {@code double} values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Double> MIN_DOUBLES(int n) {
+    return new MinNAggregator<Double>(n);
+  }
+
+  /**
+   * Return the minimum of all given {@link BigInteger} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<BigInteger> MIN_BIGINTS() {
+    return new MinBigInts();
+  }
+
+  /**
+   * Return the {@code n} smallest {@link BigInteger} values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static Aggregator<BigInteger> MIN_BIGINTS(int n) {
+    return new MinNAggregator<BigInteger>(n);
+  }
+
+  /**
+   * Return the {@code n} smallest values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @param cls The type of the values to aggregate (must implement {@link Comparable}!)
+   * @return The newly constructed instance
+   */
+  public static <V extends Comparable<V>> Aggregator<V> MIN_N(int n, Class<V> cls) {
+    return new MinNAggregator<V>(n);
+  }
+
+  /**
+   * Return the first {@code n} values (or fewer if there are fewer values than {@code n}).
+   *
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static <V> Aggregator<V> FIRST_N(int n) {
+    return new FirstNAggregator<V>(n);
+  }
+
+  /**
+   * Return the last {@code n} values (or fewer if there are fewer values than {@code n}).
+   *
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static <V> Aggregator<V> LAST_N(int n) {
+    return new LastNAggregator<V>(n);
+  }
+  
+  /**
+   * Concatenate strings, with a separator between strings. There
+   * is no limits of length for the concatenated string.
+   *
+   * <p><em>Note: String concatenation is not commutative, which means the
+   * result of the aggregation is not deterministic!</em></p>
+   *
+   * @param separator
+   *            the separator which will be appended between each string
+   * @param skipNull
+   *            define if we should skip null values. Throw
+   *            NullPointerException if set to false and there is a null
+   *            value.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<String> STRING_CONCAT(String separator, boolean skipNull) {
+    return new StringConcatAggregator(separator, skipNull);
+  }
+
+  /**
+   * Concatenate strings, with a separator between strings. You can specify
+   * the maximum length of the output string and of the input strings, if
+   * they are &gt; 0. If a value is &lt;= 0, there is no limit.
+   *
+   * <p>Any too large string (or any string which would made the output too
+   * large) will be silently discarded.</p>
+   *
+   * <p><em>Note: String concatenation is not commutative, which means the
+   * result of the aggregation is not deterministic!</em></p>
+   *
+   * @param separator
+   *            the separator which will be appended between each string
+   * @param skipNull
+   *            define if we should skip null values. Throw
+   *            NullPointerException if set to false and there is a null
+   *            value.
+   * @param maxOutputLength
+   *            the maximum length of the output string. If it's set &lt;= 0,
+   *            there is no limit. The number of characters of the output
+   *            string will be &lt; maxOutputLength.
+   * @param maxInputLength
+   *            the maximum length of the input strings. If it's set <= 0,
+   *            there is no limit. The number of characters of the input string
+   *            will be &lt; maxInputLength to be concatenated.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<String> STRING_CONCAT(String separator, boolean skipNull,
+      long maxOutputLength, long maxInputLength) {
+    return new StringConcatAggregator(separator, skipNull, maxOutputLength, maxInputLength);
+  }
+
+  /**
+   * Collect the unique elements of the input, as defined by the {@code equals} method for
+   * the input objects. No guarantees are made about the order in which the final elements
+   * will be returned.
+   * 
+   * @return The newly constructed instance
+   */
+  public static <V> Aggregator<V> UNIQUE_ELEMENTS() {
+    return new SetAggregator<V>();
+  }
+  
+  /**
+   * Collect a sample of unique elements from the input, where 'unique' is defined by
+   * the {@code equals} method for the input objects. No guarantees are made about which
+   * elements will be returned, simply that there will not be any more than the given sample
+   * size for any key.
+   * 
+   * @param maximumSampleSize The maximum number of unique elements to return per key
+   * @return The newly constructed instance
+   */
+  public static <V> Aggregator<V> SAMPLE_UNIQUE_ELEMENTS(int maximumSampleSize) {
+    return new SetAggregator<V>(maximumSampleSize);
+  }
+  
+  /**
+   * Apply separate aggregators to each component of a {@link Pair}.
+   */
+  public static <V1, V2> Aggregator<Pair<V1, V2>> pairAggregator(
+      Aggregator<V1> a1, Aggregator<V2> a2) {
+    return new PairAggregator<V1, V2>(a1, a2);
+  }
+
+  /**
+   * Apply separate aggregators to each component of a {@link Tuple3}.
+   */
+  public static <V1, V2, V3> Aggregator<Tuple3<V1, V2, V3>> tripAggregator(
+      Aggregator<V1> a1, Aggregator<V2> a2, Aggregator<V3> a3) {
+    return new TripAggregator<V1, V2, V3>(a1, a2, a3);
+  }
+
+  /**
+   * Apply separate aggregators to each component of a {@link Tuple4}.
+   */
+  public static <V1, V2, V3, V4> Aggregator<Tuple4<V1, V2, V3, V4>> quadAggregator(
+      Aggregator<V1> a1, Aggregator<V2> a2, Aggregator<V3> a3, Aggregator<V4> a4) {
+    return new QuadAggregator<V1, V2, V3, V4>(a1, a2, a3, a4);
+  }
+
+  /**
+   * Apply separate aggregators to each component of a {@link Tuple}.
+   */
+  public static Aggregator<TupleN> tupleAggregator(Aggregator<?>... aggregators) {
+    return new TupleNAggregator(aggregators);
+  }
+
+  /**
+   * Wrap a {@link CombineFn} adapter around the given aggregator.
+   *
+   * @param aggregator The instance to wrap
+   * @return A {@link CombineFn} delegating to {@code aggregator}
+   */
+  public static final <K, V> CombineFn<K, V> toCombineFn(Aggregator<V> aggregator) {
+    return new AggregatorCombineFn<K, V>(aggregator);
+  }
+
+  /**
+   * Base class for aggregators that do not require any initialization.
+   */
+  public static abstract class SimpleAggregator<T> implements Aggregator<T> {
+    @Override
+    public void initialize(Configuration conf) {
+      // No-op
+    }
+  }
+
+  /**
+   * A {@code CombineFn} that delegates all of the actual work to an
+   * {@code Aggregator} instance.
+   */
+  private static class AggregatorCombineFn<K, V> extends CombineFn<K, V> {
+    // TODO: Has to be fully qualified until CombineFn.Aggregator can be removed.
+    private final org.apache.crunch.Aggregator<V> aggregator;
+
+    public AggregatorCombineFn(org.apache.crunch.Aggregator<V> aggregator) {
+      this.aggregator = aggregator;
+    }
+
+    @Override
+    public void initialize() {
+      aggregator.initialize(getConfiguration());
+    }
+
+    @Override
+    public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) {
+      aggregator.reset();
+      for (V v : input.second()) {
+        aggregator.update(v);
+      }
+      for (V v : aggregator.results()) {
+        emitter.emit(Pair.of(input.first(), v));
+      }
+    }
+  }
+
+  private static class SumLongs extends SimpleAggregator<Long> {
+    private long sum = 0;
+
+    @Override
+    public void reset() {
+      sum = 0;
+    }
+
+    @Override
+    public void update(Long next) {
+      sum += next;
+    }
+
+    @Override
+    public Iterable<Long> results() {
+      return ImmutableList.of(sum);
+    }
+  }
+
+  private static class SumInts extends SimpleAggregator<Integer> {
+    private int sum = 0;
+
+    @Override
+    public void reset() {
+      sum = 0;
+    }
+
+    @Override
+    public void update(Integer next) {
+      sum += next;
+    }
+
+    @Override
+    public Iterable<Integer> results() {
+      return ImmutableList.of(sum);
+    }
+  }
+
+  private static class SumFloats extends SimpleAggregator<Float> {
+    private float sum = 0;
+
+    @Override
+    public void reset() {
+      sum = 0f;
+    }
+
+    @Override
+    public void update(Float next) {
+      sum += next;
+    }
+
+    @Override
+    public Iterable<Float> results() {
+      return ImmutableList.of(sum);
+    }
+  }
+
+  private static class SumDoubles extends SimpleAggregator<Double> {
+    private double sum = 0;
+
+    @Override
+    public void reset() {
+      sum = 0f;
+    }
+
+    @Override
+    public void update(Double next) {
+      sum += next;
+    }
+
+    @Override
+    public Iterable<Double> results() {
+      return ImmutableList.of(sum);
+    }
+  }
+
+  private static class SumBigInts extends SimpleAggregator<BigInteger> {
+    private BigInteger sum = BigInteger.ZERO;
+
+    @Override
+    public void reset() {
+      sum = BigInteger.ZERO;
+    }
+
+    @Override
+    public void update(BigInteger next) {
+      sum = sum.add(next);
+    }
+
+    @Override
+    public Iterable<BigInteger> results() {
+      return ImmutableList.of(sum);
+    }
+  }
+
+  private static class MaxLongs extends SimpleAggregator<Long> {
+    private Long max = null;
+
+    @Override
+    public void reset() {
+      max = null;
+    }
+
+    @Override
+    public void update(Long next) {
+      if (max == null || max < next) {
+        max = next;
+      }
+    }
+
+    @Override
+    public Iterable<Long> results() {
+      return ImmutableList.of(max);
+    }
+  }
+
+  private static class MaxInts extends SimpleAggregator<Integer> {
+    private Integer max = null;
+
+    @Override
+    public void reset() {
+      max = null;
+    }
+
+    @Override
+    public void update(Integer next) {
+      if (max == null || max < next) {
+        max = next;
+      }
+    }
+
+    @Override
+    public Iterable<Integer> results() {
+      return ImmutableList.of(max);
+    }
+  }
+
+  private static class MaxFloats extends SimpleAggregator<Float> {
+    private Float max = null;
+
+    @Override
+    public void reset() {
+      max = null;
+    }
+
+    @Override
+    public void update(Float next) {
+      if (max == null || max < next) {
+        max = next;
+      }
+    }
+
+    @Override
+    public Iterable<Float> results() {
+      return ImmutableList.of(max);
+    }
+  }
+
+  private static class MaxDoubles extends SimpleAggregator<Double> {
+    private Double max = null;
+
+    @Override
+    public void reset() {
+      max = null;
+    }
+
+    @Override
+    public void update(Double next) {
+      if (max == null || max < next) {
+        max = next;
+      }
+    }
+
+    @Override
+    public Iterable<Double> results() {
+      return ImmutableList.of(max);
+    }
+  }
+
+  private static class MaxBigInts extends SimpleAggregator<BigInteger> {
+    private BigInteger max = null;
+
+    @Override
+    public void reset() {
+      max = null;
+    }
+
+    @Override
+    public void update(BigInteger next) {
+      if (max == null || max.compareTo(next) < 0) {
+        max = next;
+      }
+    }
+
+    @Override
+    public Iterable<BigInteger> results() {
+      return ImmutableList.of(max);
+    }
+  }
+
+  private static class MinLongs extends SimpleAggregator<Long> {
+    private Long min = null;
+
+    @Override
+    public void reset() {
+      min = null;
+    }
+
+    @Override
+    public void update(Long next) {
+      if (min == null || min > next) {
+        min = next;
+      }
+    }
+
+    @Override
+    public Iterable<Long> results() {
+      return ImmutableList.of(min);
+    }
+  }
+
+  private static class MinInts extends SimpleAggregator<Integer> {
+    private Integer min = null;
+
+    @Override
+    public void reset() {
+      min = null;
+    }
+
+    @Override
+    public void update(Integer next) {
+      if (min == null || min > next) {
+        min = next;
+      }
+    }
+
+    @Override
+    public Iterable<Integer> results() {
+      return ImmutableList.of(min);
+    }
+  }
+
+  private static class MinFloats extends SimpleAggregator<Float> {
+    private Float min = null;
+
+    @Override
+    public void reset() {
+      min = null;
+    }
+
+    @Override
+    public void update(Float next) {
+      if (min == null || min > next) {
+        min = next;
+      }
+    }
+
+    @Override
+    public Iterable<Float> results() {
+      return ImmutableList.of(min);
+    }
+  }
+
+  private static class MinDoubles extends SimpleAggregator<Double> {
+    private Double min = null;
+
+    @Override
+    public void reset() {
+      min = null;
+    }
+
+    @Override
+    public void update(Double next) {
+      if (min == null || min > next) {
+        min = next;
+      }
+    }
+
+    @Override
+    public Iterable<Double> results() {
+      return ImmutableList.of(min);
+    }
+  }
+
+  private static class MinBigInts extends SimpleAggregator<BigInteger> {
+    private BigInteger min = null;
+
+    @Override
+    public void reset() {
+      min = null;
+    }
+
+    @Override
+    public void update(BigInteger next) {
+      if (min == null || min.compareTo(next) > 0) {
+        min = next;
+      }
+    }
+
+    @Override
+    public Iterable<BigInteger> results() {
+      return ImmutableList.of(min);
+    }
+  }
+
+  private static class MaxNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
+    private final int arity;
+    private transient SortedSet<V> elements;
+
+    public MaxNAggregator(int arity) {
+      this.arity = arity;
+    }
+
+    @Override
+    public void reset() {
+      if (elements == null) {
+        elements = Sets.newTreeSet();
+      } else {
+        elements.clear();
+      }
+    }
+
+    @Override
+    public void update(V value) {
+      if (elements.size() < arity) {
+        elements.add(value);
+      } else if (value.compareTo(elements.first()) > 0) {
+        elements.remove(elements.first());
+        elements.add(value);
+      }
+    }
+
+    @Override
+    public Iterable<V> results() {
+      return ImmutableList.copyOf(elements);
+    }
+  }
+
+  private static class MinNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
+    private final int arity;
+    private transient SortedSet<V> elements;
+
+    public MinNAggregator(int arity) {
+      this.arity = arity;
+    }
+
+    @Override
+    public void reset() {
+      if (elements == null) {
+        elements = Sets.newTreeSet();
+      } else {
+        elements.clear();
+      }
+    }
+
+    @Override
+    public void update(V value) {
+      if (elements.size() < arity) {
+        elements.add(value);
+      } else if (value.compareTo(elements.last()) < 0) {
+        elements.remove(elements.last());
+        elements.add(value);
+      }
+    }
+
+    @Override
+    public Iterable<V> results() {
+      return ImmutableList.copyOf(elements);
+    }
+  }
+
+  private static class FirstNAggregator<V> extends SimpleAggregator<V> {
+    private final int arity;
+    private final List<V> elements;
+
+    public FirstNAggregator(int arity) {
+      this.arity = arity;
+      this.elements = Lists.newArrayList();
+    }
+
+    @Override
+    public void reset() {
+      elements.clear();
+    }
+
+    @Override
+    public void update(V value) {
+      if (elements.size() < arity) {
+        elements.add(value);
+      }
+    }
+
+    @Override
+    public Iterable<V> results() {
+      return ImmutableList.copyOf(elements);
+    }
+  }
+
+  private static class LastNAggregator<V> extends SimpleAggregator<V> {
+    private final int arity;
+    private final LinkedList<V> elements;
+
+    public LastNAggregator(int arity) {
+      this.arity = arity;
+      this.elements = Lists.newLinkedList();
+    }
+
+    @Override
+    public void reset() {
+      elements.clear();
+    }
+
+    @Override
+    public void update(V value) {
+      elements.add(value);
+      if (elements.size() == arity + 1) {
+        elements.removeFirst();
+      }
+    }
+
+    @Override
+    public Iterable<V> results() {
+      return ImmutableList.copyOf(elements);
+    }
+  }
+
+  private static class StringConcatAggregator extends SimpleAggregator<String> {
+    private final String separator;
+    private final boolean skipNulls;
+    private final long maxOutputLength;
+    private final long maxInputLength;
+    private long currentLength;
+    private final LinkedList<String> list = new LinkedList<String>();
+
+    private transient Joiner joiner;
+
+    public StringConcatAggregator(final String separator, final boolean skipNulls) {
+      this.separator = separator;
+      this.skipNulls = skipNulls;
+      this.maxInputLength = 0;
+      this.maxOutputLength = 0;
+    }
+
+    public StringConcatAggregator(final String separator, final boolean skipNull, final long maxOutputLength, final long maxInputLength) {
+      this.separator = separator;
+      this.skipNulls = skipNull;
+      this.maxOutputLength = maxOutputLength;
+      this.maxInputLength = maxInputLength;
+      this.currentLength = -separator.length();
+    }
+
+    @Override
+    public void reset() {
+      if (joiner == null) {
+        joiner = skipNulls ? Joiner.on(separator).skipNulls() : Joiner.on(separator);
+      }
+      currentLength = -separator.length();
+      list.clear();
+    }
+
+    @Override
+    public void update(final String next) {
+      long length = (next == null) ? 0 : next.length() + separator.length();
+      if (maxOutputLength > 0 && currentLength + length > maxOutputLength || maxInputLength > 0 && next.length() > maxInputLength) {
+        return;
+      }
+      if (maxOutputLength > 0) {
+        currentLength += length;
+      }
+      list.add(next);
+    }
+
+    @Override
+    public Iterable<String> results() {
+      return ImmutableList.of(joiner.join(list));
+    }
+  }
+
+
+  private static abstract class TupleAggregator<T> implements Aggregator<T> {
+    private final List<Aggregator<Object>> aggregators;
+
+    @SuppressWarnings("unchecked")
+    public TupleAggregator(Aggregator<?>... aggregators) {
+      this.aggregators = Lists.newArrayList();
+      for (Aggregator<?> a : aggregators) {
+        this.aggregators.add((Aggregator<Object>) a);
+      }
+    }
+
+    @Override
+    public void initialize(Configuration configuration) {
+      for (Aggregator<?> a : aggregators) {
+        a.initialize(configuration);
+      }
+    }
+
+    @Override
+    public void reset() {
+      for (Aggregator<?> a : aggregators) {
+        a.reset();
+      }
+    }
+
+    protected void updateTuple(Tuple t) {
+      for (int i = 0; i < aggregators.size(); i++) {
+        aggregators.get(i).update(t.get(i));
+      }
+    }
+
+    protected Iterable<Object> results(int index) {
+      return aggregators.get(index).results();
+    }
+  }
+
+  private static class PairAggregator<V1, V2> extends TupleAggregator<Pair<V1, V2>> {
+
+    public PairAggregator(Aggregator<V1> a1, Aggregator<V2> a2) {
+      super(a1, a2);
+    }
+
+    @Override
+    public void update(Pair<V1, V2> value) {
+      updateTuple(value);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Iterable<Pair<V1, V2>> results() {
+      return new Tuples.PairIterable<V1, V2>((Iterable<V1>) results(0), (Iterable<V2>) results(1));
+    }
+  }
+
+  private static class TripAggregator<A, B, C> extends TupleAggregator<Tuple3<A, B, C>> {
+
+    public TripAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3) {
+      super(a1, a2, a3);
+    }
+
+    @Override
+    public void update(Tuple3<A, B, C> value) {
+      updateTuple(value);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Iterable<Tuple3<A, B, C>> results() {
+      return new Tuples.TripIterable<A, B, C>((Iterable<A>) results(0), (Iterable<B>) results(1),
+          (Iterable<C>) results(2));
+    }
+  }
+
+  private static class QuadAggregator<A, B, C, D> extends TupleAggregator<Tuple4<A, B, C, D>> {
+
+    public QuadAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3, Aggregator<D> a4) {
+      super(a1, a2, a3, a4);
+    }
+
+    @Override
+    public void update(Tuple4<A, B, C, D> value) {
+      updateTuple(value);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Iterable<Tuple4<A, B, C, D>> results() {
+      return new Tuples.QuadIterable<A, B, C, D>((Iterable<A>) results(0), (Iterable<B>) results(1),
+          (Iterable<C>) results(2), (Iterable<D>) results(3));
+    }
+  }
+
+  private static class TupleNAggregator extends TupleAggregator<TupleN> {
+    private final int size;
+
+    public TupleNAggregator(Aggregator<?>... aggregators) {
+      super(aggregators);
+      size = aggregators.length;
+    }
+
+    @Override
+    public void update(TupleN value) {
+      updateTuple(value);
+    }
+
+    @Override
+    public Iterable<TupleN> results() {
+      Iterable<?>[] iterables = new Iterable[size];
+      for (int i = 0; i < size; i++) {
+        iterables[i] = results(i);
+      }
+      return new Tuples.TupleNIterable(iterables);
+    }
+  }
+
+  private static class SetAggregator<V> extends SimpleAggregator<V> {
+    private final Set<V> elements;
+    private final int sizeLimit;
+    
+    public SetAggregator() {
+      this(-1);
+    }
+    
+    public SetAggregator(int sizeLimit) {
+      this.elements = Sets.newHashSet();
+      this.sizeLimit = sizeLimit;
+    }
+    
+    @Override
+    public void reset() {
+      elements.clear();
+    }
+
+    @Override
+    public void update(V value) {
+      if (sizeLimit == -1 || elements.size() < sizeLimit) {
+        elements.add(value);
+      }
+    }
+
+    @Override
+    public Iterable<V> results() {
+      return ImmutableList.copyOf(elements);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/CompositeMapFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
new file mode 100644
index 0000000..2a8e7d9
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+public class CompositeMapFn<R, S, T> extends MapFn<R, T> {
+
+  private final MapFn<R, S> first;
+  private final MapFn<S, T> second;
+
+  public CompositeMapFn(MapFn<R, S> first, MapFn<S, T> second) {
+    this.first = first;
+    this.second = second;
+  }
+
+  @Override
+  public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+    first.setContext(context);
+    second.setContext(context);
+  }
+  
+  @Override
+  public void initialize() {
+    first.initialize();
+    second.initialize();
+  }
+
+  public MapFn<R, S> getFirst() {
+    return first;
+  }
+
+  public MapFn<S, T> getSecond() {
+    return second;
+  }
+
+  @Override
+  public T map(R input) {
+    return second.map(first.map(input));
+  }
+
+  @Override
+  public void cleanup(Emitter<T> emitter) {
+    first.cleanup(null);
+    second.cleanup(null);
+  }
+
+  @Override
+  public void configure(Configuration conf) {
+    first.configure(conf);
+    second.configure(conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
new file mode 100644
index 0000000..b8cc9df
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * Wrapper function for converting a {@code MapFn} into a key-value pair that is
+ * used to convert from a {@code PCollection<V>} to a {@code PTable<K, V>}.
+ */
+public class ExtractKeyFn<K, V> extends MapFn<V, Pair<K, V>> {
+
+  private final MapFn<V, K> mapFn;
+
+  public ExtractKeyFn(MapFn<V, K> mapFn) {
+    this.mapFn = mapFn;
+  }
+
+  @Override
+  public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+    mapFn.setContext(context);
+  }
+  
+  @Override
+  public void initialize() {
+    mapFn.initialize();
+  }
+
+  @Override
+  public Pair<K, V> map(V input) {
+    return Pair.of(mapFn.map(input), input);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/fn/FilterFns.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/FilterFns.java b/crunch-core/src/main/java/org/apache/crunch/fn/FilterFns.java
new file mode 100644
index 0000000..8dc4268
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/FilterFns.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.FilterFn;
+import org.apache.crunch.FilterFn.AndFn;
+import org.apache.crunch.FilterFn.NotFn;
+import org.apache.crunch.FilterFn.OrFn;
+
+
+/**
+ * A collection of pre-defined {@link FilterFn} implementations.
+ */
+public final class FilterFns {
+  // Note: We delegate to the deprecated implementation classes in FilterFn. When their
+  //       time is up, we just move them here.
+
+  private FilterFns() {
+    // utility class, not for instantiation
+  }
+
+  /**
+   * Accept an entry if all of the given filters accept it, using short-circuit evaluation.
+   * @param fn1 The first functions to delegate to
+   * @param fn2 The second functions to delegate to
+   * @return The composed filter function
+   */
+  public static <S> FilterFn<S> and(FilterFn<S> fn1, FilterFn<S> fn2) {
+    return new AndFn<S>(fn1, fn2);
+  }
+
+  /**
+   * Accept an entry if all of the given filters accept it, using short-circuit evaluation.
+   * @param fns The functions to delegate to (in the given order)
+   * @return The composed filter function
+   */
+  public static <S> FilterFn<S> and(FilterFn<S>... fns) {
+    return new AndFn<S>(fns);
+  }
+
+  /**
+   * Accept an entry if at least one of the given filters accept it, using short-circuit evaluation.
+   * @param fn1 The first functions to delegate to
+   * @param fn2 The second functions to delegate to
+   * @return The composed filter function
+   */
+  public static <S> FilterFn<S> or(FilterFn<S> fn1, FilterFn<S> fn2) {
+    return new OrFn<S>(fn1, fn2);
+  }
+
+  /**
+   * Accept an entry if at least one of the given filters accept it, using short-circuit evaluation.
+   * @param fns The functions to delegate to (in the given order)
+   * @return The composed filter function
+   */
+  public static <S> FilterFn<S> or(FilterFn<S>... fns) {
+    return new OrFn<S>(fns);
+  }
+
+  /**
+   * Accept an entry if the given filter <em>does not</em> accept it.
+   * @param fn The function to delegate to
+   * @return The composed filter function
+   */
+  public static <S> FilterFn<S> not(FilterFn<S> fn) {
+    return new NotFn<S>(fn);
+  }
+
+  /**
+   * Accept everything.
+   * @return A filter function that accepts everything.
+   */
+  public static <S> FilterFn<S> ACCEPT_ALL() {
+    return new AcceptAllFn<S>();
+  }
+
+  /**
+   * Reject everything.
+   * @return A filter function that rejects everything.
+   */
+  public static <S> FilterFn<S> REJECT_ALL() {
+    return not(new AcceptAllFn<S>());
+  }
+
+  private static class AcceptAllFn<S> extends FilterFn<S> {
+    @Override
+    public boolean accept(S input) {
+      return true;
+    }
+
+    @Override
+    public float scaleFactor() {
+      return 1.0f;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/fn/IdentityFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/IdentityFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/IdentityFn.java
new file mode 100644
index 0000000..0eadb06
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/IdentityFn.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.MapFn;
+
+public class IdentityFn<T> extends MapFn<T, T> {
+
+  private static final IdentityFn<Object> INSTANCE = new IdentityFn<Object>();
+
+  @SuppressWarnings("unchecked")
+  public static <T> IdentityFn<T> getInstance() {
+    return (IdentityFn<T>) INSTANCE;
+  }
+
+  // Non-instantiable
+  private IdentityFn() {
+  }
+
+  @Override
+  public T map(T input) {
+    return input;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java
new file mode 100644
index 0000000..cbaf24d
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+
+public abstract class MapKeysFn<K1, K2, V> extends DoFn<Pair<K1, V>, Pair<K2, V>> {
+
+  @Override
+  public void process(Pair<K1, V> input, Emitter<Pair<K2, V>> emitter) {
+    emitter.emit(Pair.of(map(input.first()), input.second()));
+  }
+
+  public abstract K2 map(K1 k1);
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java
new file mode 100644
index 0000000..b90f5ff
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+
+public abstract class MapValuesFn<K, V1, V2> extends DoFn<Pair<K, V1>, Pair<K, V2>> {
+
+  @Override
+  public void process(Pair<K, V1> input, Emitter<Pair<K, V2>> emitter) {
+    emitter.emit(Pair.of(input.first(), map(input.second())));
+  }
+
+  public abstract V2 map(V1 v);
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/fn/PairMapFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/PairMapFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/PairMapFn.java
new file mode 100644
index 0000000..9ee4336
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/PairMapFn.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+public class PairMapFn<K, V, S, T> extends MapFn<Pair<K, V>, Pair<S, T>> {
+
+  private MapFn<K, S> keys;
+  private MapFn<V, T> values;
+
+  public PairMapFn(MapFn<K, S> keys, MapFn<V, T> values) {
+    this.keys = keys;
+    this.values = values;
+  }
+
+  @Override
+  public void configure(Configuration conf) {
+    keys.configure(conf);
+    values.configure(conf);
+  }
+
+  @Override
+  public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+    keys.setContext(context);
+    values.setContext(context);
+  }
+
+  @Override
+  public void initialize() {
+    keys.initialize();
+    values.initialize();
+  }
+  
+  @Override
+  public Pair<S, T> map(Pair<K, V> input) {
+    return Pair.of(keys.map(input.first()), values.map(input.second()));
+  }
+
+  @Override
+  public void cleanup(Emitter<Pair<S, T>> emitter) {
+    keys.cleanup(null);
+    values.cleanup(null);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/fn/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/package-info.java b/crunch-core/src/main/java/org/apache/crunch/fn/package-info.java
new file mode 100644
index 0000000..acefdff
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Commonly used functions for manipulating collections.
+ */
+package org.apache.crunch.fn;

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java
new file mode 100644
index 0000000..887c051
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.hadoop.mapreduce;
+
+import java.lang.reflect.Constructor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * A factory class that allows us to hide the fact that {@code TaskAttemptContext} is a class in
+ * Hadoop 1.x.x and an interface in Hadoop 2.x.x.
+ */
+@SuppressWarnings("unchecked")
+public class TaskAttemptContextFactory {
+
+  private static final Log LOG = LogFactory.getLog(TaskAttemptContextFactory.class);
+
+  private static final TaskAttemptContextFactory INSTANCE = new TaskAttemptContextFactory();
+
+  public static TaskAttemptContext create(Configuration conf, TaskAttemptID taskAttemptId) {
+    return INSTANCE.createInternal(conf, taskAttemptId);
+  }
+
+  private Constructor<TaskAttemptContext> taskAttemptConstructor;
+
+  private TaskAttemptContextFactory() {
+    Class<TaskAttemptContext> implClass = TaskAttemptContext.class;
+    if (implClass.isInterface()) {
+      try {
+        implClass = (Class<TaskAttemptContext>) Class.forName(
+            "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
+      } catch (ClassNotFoundException e) {
+        LOG.fatal("Could not find TaskAttemptContextImpl class, exiting", e);
+      }
+    }
+    try {
+      this.taskAttemptConstructor = implClass.getConstructor(Configuration.class, TaskAttemptID.class);
+    } catch (Exception e) {
+      LOG.fatal("Could not access TaskAttemptContext constructor, exiting", e);
+    }
+  }
+
+  private TaskAttemptContext createInternal(Configuration conf, TaskAttemptID taskAttemptId) {
+    try {
+      return (TaskAttemptContext) taskAttemptConstructor.newInstance(conf, taskAttemptId);
+    } catch (Exception e) {
+      LOG.error("Could not construct a TaskAttemptContext instance", e);
+      return null;
+    }
+  }
+}