You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:16 UTC

[14/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/Pipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Pipeline.java b/crunch/src/main/java/org/apache/crunch/Pipeline.java
deleted file mode 100644
index 84c720c..0000000
--- a/crunch/src/main/java/org/apache/crunch/Pipeline.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Manages the state of a pipeline execution.
- * 
- */
-public interface Pipeline {
-
-  /**
-   * Set the {@code Configuration} to use with this pipeline.
-   */
-  void setConfiguration(Configuration conf);
-
-  /**
-   * Returns the name of this pipeline.
-   * 
-   * @return Name of the pipeline
-   */
-  String getName();
-
-  /**
-   * Returns the {@code Configuration} instance associated with this pipeline.
-   */
-  Configuration getConfiguration();
-
-  /**
-   * Converts the given {@code Source} into a {@code PCollection} that is
-   * available to jobs run using this {@code Pipeline} instance.
-   * 
-   * @param source
-   *          The source of data
-   * @return A PCollection that references the given source
-   */
-  <T> PCollection<T> read(Source<T> source);
-
-  /**
-   * A version of the read method for {@code TableSource} instances that map to
-   * {@code PTable}s.
-   * 
-   * @param tableSource
-   *          The source of the data
-   * @return A PTable that references the given source
-   */
-  <K, V> PTable<K, V> read(TableSource<K, V> tableSource);
-
-  /**
-   * Write the given collection to the given target on the next pipeline run. The
-   * system will check to see if the target's location already exists using the
-   * {@code WriteMode.DEFAULT} rule for the given {@code Target}.
-   * 
-   * @param collection
-   *          The collection
-   * @param target
-   *          The output target
-   */
-  void write(PCollection<?> collection, Target target);
-
-  /**
-  * Write the contents of the {@code PCollection} to the given {@code Target},
-  * using the storage format specified by the target and the given
-  * {@code WriteMode} for cases where the referenced {@code Target}
-  * already exists.
-  *
-  * @param collection
-  *          The collection
-  * @param target
-  *          The target to write to
-  * @param writeMode
-  *          The strategy to use for handling existing outputs
-  */
- void write(PCollection<?> collection, Target target,
-     Target.WriteMode writeMode);
-
- /**
-   * Create the given PCollection and read the data it contains into the
-   * returned Collection instance for client use.
-   * 
-   * @param pcollection
-   *          The PCollection to materialize
-   * @return the data from the PCollection as a read-only Collection
-   */
-  <T> Iterable<T> materialize(PCollection<T> pcollection);
-
-  /**
-   * Constructs and executes a series of MapReduce jobs in order to write data
-   * to the output targets.
-   */
-  PipelineResult run();
-
-  /**
-   * Constructs and starts a series of MapReduce jobs in order ot write data to
-   * the output targets, but returns a {@code ListenableFuture} to allow clients to control
-   * job execution.
-   * @return
-   */
-  PipelineExecution runAsync();
-  
-  /**
-   * Run any remaining jobs required to generate outputs and then clean up any
-   * intermediate data files that were created in this run or previous calls to
-   * {@code run}.
-   */
-  PipelineResult done();
-
-  /**
-   * A convenience method for reading a text file.
-   */
-  PCollection<String> readTextFile(String pathName);
-
-  /**
-   * A convenience method for writing a text file.
-   */
-  <T> void writeTextFile(PCollection<T> collection, String pathName);
-
-  /**
-   * Turn on debug logging for jobs that are run from this pipeline.
-   */
-  void enableDebug();
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/PipelineExecution.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PipelineExecution.java b/crunch/src/main/java/org/apache/crunch/PipelineExecution.java
deleted file mode 100644
index fc6bb91..0000000
--- a/crunch/src/main/java/org/apache/crunch/PipelineExecution.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * A handle to allow clients to control a Crunch pipeline as it runs.
- *
- * This interface is thread-safe.
- */
-public interface PipelineExecution {
-
-  enum Status { READY, RUNNING, SUCCEEDED, FAILED, KILLED }
-
-  /** Returns the .dot file that allows a client to graph the Crunch execution plan for this
-   * pipeline.
-   */
-  String getPlanDotFile();
-
-  /** Blocks until pipeline completes or the specified waiting time elapsed. */
-   void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException;
-
-   /** Blocks until pipeline completes, i.e. {@code SUCCEEDED}, {@code FAILED} or {@code KILLED}. */
-  void waitUntilDone() throws InterruptedException;
-
-  Status getStatus();
-
-  /** Retrieve the result of a pipeline if it has been completed, otherwise {@code null}. */
-  PipelineResult getResult();
-
-  /**
-   * Kills the pipeline if it is running, no-op otherwise.
-   *
-   * This method only delivers a kill signal to the pipeline, and does not guarantee the pipeline exits on return.
-   * To wait for completely exits, use {@link #waitUntilDone()} after this call.
-   */
-  void kill() throws InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/PipelineResult.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PipelineResult.java b/crunch/src/main/java/org/apache/crunch/PipelineResult.java
deleted file mode 100644
index 90b1067..0000000
--- a/crunch/src/main/java/org/apache/crunch/PipelineResult.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.util.List;
-
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Counters;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- * Container for the results of a call to {@code run} or {@code done} on the
- * Pipeline interface that includes details and statistics about the component
- * stages of the data pipeline.
- */
-public class PipelineResult {
-
-  public static class StageResult {
-
-    private final String stageName;
-    private final Counters counters;
-
-    public StageResult(String stageName, Counters counters) {
-      this.stageName = stageName;
-      this.counters = counters;
-    }
-
-    public String getStageName() {
-      return stageName;
-    }
-
-    public Counters getCounters() {
-      return counters;
-    }
-
-    public Counter findCounter(Enum<?> key) {
-      return counters.findCounter(key);
-    }
-
-    public long getCounterValue(Enum<?> key) {
-      return findCounter(key).getValue();
-    }
-  }
-
-  public static final PipelineResult EMPTY = new PipelineResult(ImmutableList.<StageResult> of());
-
-  private final List<StageResult> stageResults;
-
-  public PipelineResult(List<StageResult> stageResults) {
-    this.stageResults = ImmutableList.copyOf(stageResults);
-  }
-
-  public boolean succeeded() {
-    return !stageResults.isEmpty();
-  }
-
-  public List<StageResult> getStageResults() {
-    return stageResults;
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/Source.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Source.java b/crunch/src/main/java/org/apache/crunch/Source.java
deleted file mode 100644
index f54d135..0000000
--- a/crunch/src/main/java/org/apache/crunch/Source.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.io.IOException;
-
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * A {@code Source} represents an input data set that is an input to one or more
- * MapReduce jobs.
- * 
- */
-public interface Source<T> {
-  /**
-   * Returns the {@code PType} for this source.
-   */
-  PType<T> getType();
-
-  /**
-   * Configure the given job to use this source as an input.
-   * 
-   * @param job
-   *          The job to configure
-   * @param inputId
-   *          For a multi-input job, an identifier for this input to the job
-   * @throws IOException
-   */
-  void configureSource(Job job, int inputId) throws IOException;
-
-  /**
-   * Returns the number of bytes in this {@code Source}.
-   */
-  long getSize(Configuration configuration);
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/SourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/SourceTarget.java b/crunch/src/main/java/org/apache/crunch/SourceTarget.java
deleted file mode 100644
index 09c03c6..0000000
--- a/crunch/src/main/java/org/apache/crunch/SourceTarget.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-/**
- * An interface for classes that implement both the {@code Source} and the
- * {@code Target} interfaces.
- *
- */
-public interface SourceTarget<T> extends Source<T>, Target {
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/TableSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/TableSource.java b/crunch/src/main/java/org/apache/crunch/TableSource.java
deleted file mode 100644
index ff27346..0000000
--- a/crunch/src/main/java/org/apache/crunch/TableSource.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.crunch.types.PTableType;
-
-/**
- * The interface {@code Source} implementations that return a {@link PTable}.
- * 
- */
-public interface TableSource<K, V> extends Source<Pair<K, V>> {
-  PTableType<K, V> getTableType();
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/TableSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/TableSourceTarget.java b/crunch/src/main/java/org/apache/crunch/TableSourceTarget.java
deleted file mode 100644
index 9b1ed34..0000000
--- a/crunch/src/main/java/org/apache/crunch/TableSourceTarget.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-/**
- * An interface for classes that implement both the {@code TableSource} and the
- * {@code Target} interfaces.
- */
-public interface TableSourceTarget<K, V> extends TableSource<K, V>, SourceTarget<Pair<K, V>> {
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/Target.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Target.java b/crunch/src/main/java/org/apache/crunch/Target.java
deleted file mode 100644
index 0a0c23d..0000000
--- a/crunch/src/main/java/org/apache/crunch/Target.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.crunch.io.OutputHandler;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * A {@code Target} represents the output destination of a Crunch {@code PCollection}
- * in the context of a Crunch job.
- */
-public interface Target {
-
-  /**
-   * An enum to represent different options the client may specify
-   * for handling the case where the output path, table, etc. referenced
-   * by a {@code Target} already exists.
-   */
-  enum WriteMode {
-    /**
-     * Check to see if the output target already exists before running
-     * the pipeline, and if it does, print an error and throw an exception.
-     */
-    DEFAULT,
-    
-    /**
-     * Check to see if the output target already exists, and if it does,
-     * delete it and overwrite it with the new output (if any).
-     */
-    OVERWRITE,
-
-    /**
-     * If the output target does not exist, create it. If it does exist,
-     * add the output of this pipeline to the target. This was the
-     * behavior in Crunch up to version 0.4.0.
-     */
-    APPEND
-  }
-
-  /**
-   * Apply the given {@code WriteMode} to this {@code Target} instance.
-   * 
-   * @param writeMode The strategy for handling existing outputs
-   * @param conf The ever-useful {@code Configuration} instance
-   */
-  void handleExisting(WriteMode writeMode, Configuration conf);
-  
-  /**
-   * Checks to see if this {@code Target} instance is compatible with the
-   * given {@code PType}.
-   * 
-   * @param handler The {@link OutputHandler} that is managing the output for the job
-   * @param ptype The {@code PType} to check
-   * @return True if this Target can write data in the form of the given {@code PType},
-   * false otherwise
-   */
-  boolean accept(OutputHandler handler, PType<?> ptype);
-
-  /**
-   * Attempt to create the {@code SourceTarget} type that corresponds to this {@code Target}
-   * for the given {@code PType}, if possible. If it is not possible, return {@code null}.
-   * 
-   * @param ptype The {@code PType} to use in constructing the {@code SourceTarget}
-   * @return A new {@code SourceTarget} or null if such a {@code SourceTarget} does not exist
-   */
-  <T> SourceTarget<T> asSourceTarget(PType<T> ptype);
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/Tuple.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Tuple.java b/crunch/src/main/java/org/apache/crunch/Tuple.java
deleted file mode 100644
index 4e602ff..0000000
--- a/crunch/src/main/java/org/apache/crunch/Tuple.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-/**
- * A fixed-size collection of Objects, used in Crunch for representing joins
- * between {@code PCollection}s.
- * 
- */
-public interface Tuple {
-
-  /**
-   * Returns the Object at the given index.
-   */
-  Object get(int index);
-
-  /**
-   * Returns the number of elements in this Tuple.
-   */
-  int size();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/Tuple3.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Tuple3.java b/crunch/src/main/java/org/apache/crunch/Tuple3.java
deleted file mode 100644
index 4372811..0000000
--- a/crunch/src/main/java/org/apache/crunch/Tuple3.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-/**
- * A convenience class for three-element {@link Tuple}s.
- */
-public class Tuple3<V1, V2, V3> implements Tuple {
-
-  private final V1 first;
-  private final V2 second;
-  private final V3 third;
-
-  public static <A, B, C> Tuple3<A, B, C> of(A a, B b, C c) {
-    return new Tuple3<A, B, C>(a, b, c);
-  }
-
-  public Tuple3(V1 first, V2 second, V3 third) {
-    this.first = first;
-    this.second = second;
-    this.third = third;
-  }
-
-  public V1 first() {
-    return first;
-  }
-
-  public V2 second() {
-    return second;
-  }
-
-  public V3 third() {
-    return third;
-  }
-
-  public Object get(int index) {
-    switch (index) {
-    case 0:
-      return first;
-    case 1:
-      return second;
-    case 2:
-      return third;
-    default:
-      throw new ArrayIndexOutOfBoundsException();
-    }
-  }
-
-  public int size() {
-    return 3;
-  }
-
-  @Override
-  public int hashCode() {
-    HashCodeBuilder hcb = new HashCodeBuilder();
-    return hcb.append(first).append(second).append(third).toHashCode();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    Tuple3<?, ?, ?> other = (Tuple3<?, ?, ?>) obj;
-    return (first == other.first || (first != null && first.equals(other.first)))
-        && (second == other.second || (second != null && second.equals(other.second)))
-        && (third == other.third || (third != null && third.equals(other.third)));
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder("Tuple3[");
-    sb.append(first).append(",").append(second).append(",").append(third);
-    return sb.append("]").toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/Tuple4.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Tuple4.java b/crunch/src/main/java/org/apache/crunch/Tuple4.java
deleted file mode 100644
index f161371..0000000
--- a/crunch/src/main/java/org/apache/crunch/Tuple4.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-/**
- * A convenience class for four-element {@link Tuple}s.
- */
-public class Tuple4<V1, V2, V3, V4> implements Tuple {
-
-  private final V1 first;
-  private final V2 second;
-  private final V3 third;
-  private final V4 fourth;
-
-  public static <A, B, C, D> Tuple4<A, B, C, D> of(A a, B b, C c, D d) {
-    return new Tuple4<A, B, C, D>(a, b, c, d);
-  }
-
-  public Tuple4(V1 first, V2 second, V3 third, V4 fourth) {
-    this.first = first;
-    this.second = second;
-    this.third = third;
-    this.fourth = fourth;
-  }
-
-  public V1 first() {
-    return first;
-  }
-
-  public V2 second() {
-    return second;
-  }
-
-  public V3 third() {
-    return third;
-  }
-
-  public V4 fourth() {
-    return fourth;
-  }
-
-  public Object get(int index) {
-    switch (index) {
-    case 0:
-      return first;
-    case 1:
-      return second;
-    case 2:
-      return third;
-    case 3:
-      return fourth;
-    default:
-      throw new ArrayIndexOutOfBoundsException();
-    }
-  }
-
-  public int size() {
-    return 4;
-  }
-
-  @Override
-  public int hashCode() {
-    HashCodeBuilder hcb = new HashCodeBuilder();
-    return hcb.append(first).append(second).append(third).append(fourth).toHashCode();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    Tuple4<?, ?, ?, ?> other = (Tuple4<?, ?, ?, ?>) obj;
-    return (first == other.first || (first != null && first.equals(other.first)))
-        && (second == other.second || (second != null && second.equals(other.second)))
-        && (third == other.third || (third != null && third.equals(other.third)))
-        && (fourth == other.fourth || (fourth != null && fourth.equals(other.fourth)));
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder("Tuple4[");
-    sb.append(first).append(",").append(second).append(",").append(third);
-    return sb.append(",").append(fourth).append("]").toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/TupleN.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/TupleN.java b/crunch/src/main/java/org/apache/crunch/TupleN.java
deleted file mode 100644
index e5eceb5..0000000
--- a/crunch/src/main/java/org/apache/crunch/TupleN.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.util.Arrays;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-/**
- * A {@link Tuple} instance for an arbitrary number of values.
- */
-public class TupleN implements Tuple {
-
-  private final Object values[];
-
-  public static TupleN of(Object... values) {
-    return new TupleN(values);
-  }
-
-  public TupleN(Object... values) {
-    this.values = new Object[values.length];
-    System.arraycopy(values, 0, this.values, 0, values.length);
-  }
-
-  public Object get(int index) {
-    return values[index];
-  }
-
-  public int size() {
-    return values.length;
-  }
-
-  @Override
-  public int hashCode() {
-    HashCodeBuilder hcb = new HashCodeBuilder();
-    for (Object v : values) {
-      hcb.append(v);
-    }
-    return hcb.toHashCode();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    TupleN other = (TupleN) obj;
-    return Arrays.equals(this.values, other.values);
-  }
-
-  @Override
-  public String toString() {
-    return Arrays.toString(values);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java b/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java
deleted file mode 100644
index 0ac79e2..0000000
--- a/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java
+++ /dev/null
@@ -1,1111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import java.math.BigInteger;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.SortedSet;
-
-import org.apache.crunch.Aggregator;
-import org.apache.crunch.CombineFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.PGroupedTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-import org.apache.crunch.util.Tuples;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-
-/**
- * A collection of pre-defined {@link org.apache.crunch.Aggregator}s.
- *
- * <p>The factory methods of this class return {@link org.apache.crunch.Aggregator}
- * instances that you can use to combine the values of a {@link PGroupedTable}.
- * In most cases, they turn a multimap (multiple entries per key) into a map (one
- * entry per key).</p>
- *
- * <p><strong>Note</strong>: When using composed aggregators, like those built by the
- * {@link #pairAggregator(Aggregator, Aggregator) pairAggregator()}
- * factory method, you typically don't want to put in the same child aggregator more than once,
- * even if all child aggregators have the same type. In most cases, this is what you want:</p>
- *
- * <pre>
- *   PTable&lt;K, Long&gt; result = groupedTable.combineValues(
- *      pairAggregator(SUM_LONGS(), SUM_LONGS())
- *   );
- * </pre>
- */
-public final class Aggregators {
-
-  private Aggregators() {
-    // utility class, not for instantiation
-  }
-
-  /**
-   * Sum up all {@code long} values.
-   * @return The newly constructed instance
-   */
-  public static Aggregator<Long> SUM_LONGS() {
-    return new SumLongs();
-  }
-
-  /**
-   * Sum up all {@code int} values.
-   * @return The newly constructed instance
-   */
-  public static Aggregator<Integer> SUM_INTS() {
-    return new SumInts();
-  }
-
-  /**
-   * Sum up all {@code float} values.
-   * @return The newly constructed instance
-   */
-  public static Aggregator<Float> SUM_FLOATS() {
-    return new SumFloats();
-  }
-
-  /**
-   * Sum up all {@code double} values.
-   * @return The newly constructed instance
-   */
-  public static Aggregator<Double> SUM_DOUBLES() {
-    return new SumDoubles();
-  }
-
-  /**
-   * Sum up all {@link BigInteger} values.
-   * @return The newly constructed instance
-   */
-  public static Aggregator<BigInteger> SUM_BIGINTS() {
-    return new SumBigInts();
-  }
-
-  /**
-   * Return the maximum of all given {@code long} values.
-   * @return The newly constructed instance
-   */
-  public static Aggregator<Long> MAX_LONGS() {
-    return new MaxLongs();
-  }
-
-  /**
-   * Return the {@code n} largest {@code long} values (or fewer if there are fewer
-   * values than {@code n}).
-   * @param n The number of values to return
-   * @return The newly constructed instance
-   */
-  public static Aggregator<Long> MAX_LONGS(int n) {
-    return new MaxLongs();
-  }
-
-  /**
-   * Return the maximum of all given {@code int} values.
-   * @return The newly constructed instance
-   */
-  public static Aggregator<Integer> MAX_INTS() {
-    return new MaxInts();
-  }
-
-  /**
-   * Return the {@code n} largest {@code int} values (or fewer if there are fewer
-   * values than {@code n}).
-   * @param n The number of values to return
-   * @return The newly constructed instance
-   */
-  public static Aggregator<Integer> MAX_INTS(int n) {
-    return new MaxNAggregator<Integer>(n);
-  }
-
-  /**
-   * Return the maximum of all given {@code float} values.
-   * @return The newly constructed instance
-   */
-  public static Aggregator<Float> MAX_FLOATS() {
-    return new MaxFloats();
-  }
-
-  /**
-   * Return the {@code n} largest {@code float} values (or fewer if there are fewer
-   * values than {@code n}).
-   * @param n The number of values to return
-   * @return The newly constructed instance
-   */
-  public static Aggregator<Float> MAX_FLOATS(int n) {
-    return new MaxNAggregator<Float>(n);
-  }
-
-  /**
-   * Return the maximum of all given {@code double} values.
-   * @return The newly constructed instance
-   */
-  public static Aggregator<Double> MAX_DOUBLES() {
-    return new MaxDoubles();
-  }
-
-  /**
-   * Return the {@code n} largest {@code double} values (or fewer if there are fewer
-   * values than {@code n}).
-   * @param n The number of values to return
-   * @return The newly constructed instance
-   */
-  public static Aggregator<Double> MAX_DOUBLES(int n) {
-    return new MaxNAggregator<Double>(n);
-  }
-
-  /**
-   * Return the maximum of all given {@link BigInteger} values.
-   * @return The newly constructed instance
-   */
-  public static Aggregator<BigInteger> MAX_BIGINTS() {
-    return new MaxBigInts();
-  }
-
-  /**
-   * Return the {@code n} largest {@link BigInteger} values (or fewer if there are fewer
-   * values than {@code n}).
-   * @param n The number of values to return
-   * @return The newly constructed instance
-   */
-  public static Aggregator<BigInteger> MAX_BIGINTS(int n) {
-    return new MaxNAggregator<BigInteger>(n);
-  }
-
-  /**
-   * Return the {@code n} largest values (or fewer if there are fewer
-   * values than {@code n}).
-   * @param n The number of values to return
-   * @param cls The type of the values to aggregate (must implement {@link Comparable}!)
-   * @return The newly constructed instance
-   */
-  public static <V extends Comparable<V>> Aggregator<V> MAX_N(int n, Class<V> cls) {
-    return new MaxNAggregator<V>(n);
-  }
-
-  /**
-   * Return the minimum of all given {@code long} values.
-   * @return The newly constructed instance
-   */
-  public static Aggregator<Long> MIN_LONGS() {
-    return new MinLongs();
-  }
-
-  /**
-   * Return the {@code n} smallest {@code long} values (or fewer if there are fewer
-   * values than {@code n}).
-   * @param n The number of values to return
-   * @return The newly constructed instance
-   */
-  public static Aggregator<Long> MIN_LONGS(int n) {
-    return new MinNAggregator<Long>(n);
-  }
-
-  /**
-   * Return the minimum of all given {@code int} values.
-   * @return The newly constructed instance
-   */
-  public static Aggregator<Integer> MIN_INTS() {
-    return new MinInts();
-  }
-
-  /**
-   * Return the {@code n} smallest {@code int} values (or fewer if there are fewer
-   * values than {@code n}).
-   * @param n The number of values to return
-   * @return The newly constructed instance
-   */
-  public static Aggregator<Integer> MIN_INTS(int n) {
-    return new MinNAggregator<Integer>(n);
-  }
-
-  /**
-   * Return the minimum of all given {@code float} values.
-   * @return The newly constructed instance
-   */
-  public static Aggregator<Float> MIN_FLOATS() {
-    return new MinFloats();
-  }
-
-  /**
-   * Return the {@code n} smallest {@code float} values (or fewer if there are fewer
-   * values than {@code n}).
-   * @param n The number of values to return
-   * @return The newly constructed instance
-   */
-  public static Aggregator<Float> MIN_FLOATS(int n) {
-    return new MinNAggregator<Float>(n);
-  }
-
-  /**
-   * Return the minimum of all given {@code double} values.
-   * @return The newly constructed instance
-   */
-  public static Aggregator<Double> MIN_DOUBLES() {
-    return new MinDoubles();
-  }
-
-  /**
-   * Return the {@code n} smallest {@code double} values (or fewer if there are fewer
-   * values than {@code n}).
-   * @param n The number of values to return
-   * @return The newly constructed instance
-   */
-  public static Aggregator<Double> MIN_DOUBLES(int n) {
-    return new MinNAggregator<Double>(n);
-  }
-
-  /**
-   * Return the minimum of all given {@link BigInteger} values.
-   * @return The newly constructed instance
-   */
-  public static Aggregator<BigInteger> MIN_BIGINTS() {
-    return new MinBigInts();
-  }
-
-  /**
-   * Return the {@code n} smallest {@link BigInteger} values (or fewer if there are fewer
-   * values than {@code n}).
-   * @param n The number of values to return
-   * @return The newly constructed instance
-   */
-  public static Aggregator<BigInteger> MIN_BIGINTS(int n) {
-    return new MinNAggregator<BigInteger>(n);
-  }
-
-  /**
-   * Return the {@code n} smallest values (or fewer if there are fewer
-   * values than {@code n}).
-   * @param n The number of values to return
-   * @param cls The type of the values to aggregate (must implement {@link Comparable}!)
-   * @return The newly constructed instance
-   */
-  public static <V extends Comparable<V>> Aggregator<V> MIN_N(int n, Class<V> cls) {
-    return new MinNAggregator<V>(n);
-  }
-
-  /**
-   * Return the first {@code n} values (or fewer if there are fewer values than {@code n}).
-   *
-   * @param n The number of values to return
-   * @return The newly constructed instance
-   */
-  public static <V> Aggregator<V> FIRST_N(int n) {
-    return new FirstNAggregator<V>(n);
-  }
-
-  /**
-   * Return the last {@code n} values (or fewer if there are fewer values than {@code n}).
-   *
-   * @param n The number of values to return
-   * @return The newly constructed instance
-   */
-  public static <V> Aggregator<V> LAST_N(int n) {
-    return new LastNAggregator<V>(n);
-  }
-  
-  /**
-   * Concatenate strings, with a separator between strings. There
-   * is no limits of length for the concatenated string.
-   *
-   * <p><em>Note: String concatenation is not commutative, which means the
-   * result of the aggregation is not deterministic!</em></p>
-   *
-   * @param separator
-   *            the separator which will be appended between each string
-   * @param skipNull
-   *            define if we should skip null values. Throw
-   *            NullPointerException if set to false and there is a null
-   *            value.
-   * @return The newly constructed instance
-   */
-  public static Aggregator<String> STRING_CONCAT(String separator, boolean skipNull) {
-    return new StringConcatAggregator(separator, skipNull);
-  }
-
-  /**
-   * Concatenate strings, with a separator between strings. You can specify
-   * the maximum length of the output string and of the input strings, if
-   * they are &gt; 0. If a value is &lt;= 0, there is no limit.
-   *
-   * <p>Any too large string (or any string which would made the output too
-   * large) will be silently discarded.</p>
-   *
-   * <p><em>Note: String concatenation is not commutative, which means the
-   * result of the aggregation is not deterministic!</em></p>
-   *
-   * @param separator
-   *            the separator which will be appended between each string
-   * @param skipNull
-   *            define if we should skip null values. Throw
-   *            NullPointerException if set to false and there is a null
-   *            value.
-   * @param maxOutputLength
-   *            the maximum length of the output string. If it's set &lt;= 0,
-   *            there is no limit. The number of characters of the output
-   *            string will be &lt; maxOutputLength.
-   * @param maxInputLength
-   *            the maximum length of the input strings. If it's set <= 0,
-   *            there is no limit. The number of characters of the input string
-   *            will be &lt; maxInputLength to be concatenated.
-   * @return The newly constructed instance
-   */
-  public static Aggregator<String> STRING_CONCAT(String separator, boolean skipNull,
-      long maxOutputLength, long maxInputLength) {
-    return new StringConcatAggregator(separator, skipNull, maxOutputLength, maxInputLength);
-  }
-
-  /**
-   * Collect the unique elements of the input, as defined by the {@code equals} method for
-   * the input objects. No guarantees are made about the order in which the final elements
-   * will be returned.
-   * 
-   * @return The newly constructed instance
-   */
-  public static <V> Aggregator<V> UNIQUE_ELEMENTS() {
-    return new SetAggregator<V>();
-  }
-  
-  /**
-   * Collect a sample of unique elements from the input, where 'unique' is defined by
-   * the {@code equals} method for the input objects. No guarantees are made about which
-   * elements will be returned, simply that there will not be any more than the given sample
-   * size for any key.
-   * 
-   * @param maximumSampleSize The maximum number of unique elements to return per key
-   * @return The newly constructed instance
-   */
-  public static <V> Aggregator<V> SAMPLE_UNIQUE_ELEMENTS(int maximumSampleSize) {
-    return new SetAggregator<V>(maximumSampleSize);
-  }
-  
-  /**
-   * Apply separate aggregators to each component of a {@link Pair}.
-   */
-  public static <V1, V2> Aggregator<Pair<V1, V2>> pairAggregator(
-      Aggregator<V1> a1, Aggregator<V2> a2) {
-    return new PairAggregator<V1, V2>(a1, a2);
-  }
-
-  /**
-   * Apply separate aggregators to each component of a {@link Tuple3}.
-   */
-  public static <V1, V2, V3> Aggregator<Tuple3<V1, V2, V3>> tripAggregator(
-      Aggregator<V1> a1, Aggregator<V2> a2, Aggregator<V3> a3) {
-    return new TripAggregator<V1, V2, V3>(a1, a2, a3);
-  }
-
-  /**
-   * Apply separate aggregators to each component of a {@link Tuple4}.
-   */
-  public static <V1, V2, V3, V4> Aggregator<Tuple4<V1, V2, V3, V4>> quadAggregator(
-      Aggregator<V1> a1, Aggregator<V2> a2, Aggregator<V3> a3, Aggregator<V4> a4) {
-    return new QuadAggregator<V1, V2, V3, V4>(a1, a2, a3, a4);
-  }
-
-  /**
-   * Apply separate aggregators to each component of a {@link Tuple}.
-   */
-  public static Aggregator<TupleN> tupleAggregator(Aggregator<?>... aggregators) {
-    return new TupleNAggregator(aggregators);
-  }
-
-  /**
-   * Wrap a {@link CombineFn} adapter around the given aggregator.
-   *
-   * @param aggregator The instance to wrap
-   * @return A {@link CombineFn} delegating to {@code aggregator}
-   */
-  public static final <K, V> CombineFn<K, V> toCombineFn(Aggregator<V> aggregator) {
-    return new AggregatorCombineFn<K, V>(aggregator);
-  }
-
-  /**
-   * Base class for aggregators that do not require any initialization.
-   */
-  public static abstract class SimpleAggregator<T> implements Aggregator<T> {
-    @Override
-    public void initialize(Configuration conf) {
-      // No-op
-    }
-  }
-
-  /**
-   * A {@code CombineFn} that delegates all of the actual work to an
-   * {@code Aggregator} instance.
-   */
-  private static class AggregatorCombineFn<K, V> extends CombineFn<K, V> {
-    // TODO: Has to be fully qualified until CombineFn.Aggregator can be removed.
-    private final org.apache.crunch.Aggregator<V> aggregator;
-
-    public AggregatorCombineFn(org.apache.crunch.Aggregator<V> aggregator) {
-      this.aggregator = aggregator;
-    }
-
-    @Override
-    public void initialize() {
-      aggregator.initialize(getConfiguration());
-    }
-
-    @Override
-    public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) {
-      aggregator.reset();
-      for (V v : input.second()) {
-        aggregator.update(v);
-      }
-      for (V v : aggregator.results()) {
-        emitter.emit(Pair.of(input.first(), v));
-      }
-    }
-  }
-
-  private static class SumLongs extends SimpleAggregator<Long> {
-    private long sum = 0;
-
-    @Override
-    public void reset() {
-      sum = 0;
-    }
-
-    @Override
-    public void update(Long next) {
-      sum += next;
-    }
-
-    @Override
-    public Iterable<Long> results() {
-      return ImmutableList.of(sum);
-    }
-  }
-
-  private static class SumInts extends SimpleAggregator<Integer> {
-    private int sum = 0;
-
-    @Override
-    public void reset() {
-      sum = 0;
-    }
-
-    @Override
-    public void update(Integer next) {
-      sum += next;
-    }
-
-    @Override
-    public Iterable<Integer> results() {
-      return ImmutableList.of(sum);
-    }
-  }
-
-  private static class SumFloats extends SimpleAggregator<Float> {
-    private float sum = 0;
-
-    @Override
-    public void reset() {
-      sum = 0f;
-    }
-
-    @Override
-    public void update(Float next) {
-      sum += next;
-    }
-
-    @Override
-    public Iterable<Float> results() {
-      return ImmutableList.of(sum);
-    }
-  }
-
-  private static class SumDoubles extends SimpleAggregator<Double> {
-    private double sum = 0;
-
-    @Override
-    public void reset() {
-      sum = 0f;
-    }
-
-    @Override
-    public void update(Double next) {
-      sum += next;
-    }
-
-    @Override
-    public Iterable<Double> results() {
-      return ImmutableList.of(sum);
-    }
-  }
-
-  private static class SumBigInts extends SimpleAggregator<BigInteger> {
-    private BigInteger sum = BigInteger.ZERO;
-
-    @Override
-    public void reset() {
-      sum = BigInteger.ZERO;
-    }
-
-    @Override
-    public void update(BigInteger next) {
-      sum = sum.add(next);
-    }
-
-    @Override
-    public Iterable<BigInteger> results() {
-      return ImmutableList.of(sum);
-    }
-  }
-
-  private static class MaxLongs extends SimpleAggregator<Long> {
-    private Long max = null;
-
-    @Override
-    public void reset() {
-      max = null;
-    }
-
-    @Override
-    public void update(Long next) {
-      if (max == null || max < next) {
-        max = next;
-      }
-    }
-
-    @Override
-    public Iterable<Long> results() {
-      return ImmutableList.of(max);
-    }
-  }
-
-  private static class MaxInts extends SimpleAggregator<Integer> {
-    private Integer max = null;
-
-    @Override
-    public void reset() {
-      max = null;
-    }
-
-    @Override
-    public void update(Integer next) {
-      if (max == null || max < next) {
-        max = next;
-      }
-    }
-
-    @Override
-    public Iterable<Integer> results() {
-      return ImmutableList.of(max);
-    }
-  }
-
-  private static class MaxFloats extends SimpleAggregator<Float> {
-    private Float max = null;
-
-    @Override
-    public void reset() {
-      max = null;
-    }
-
-    @Override
-    public void update(Float next) {
-      if (max == null || max < next) {
-        max = next;
-      }
-    }
-
-    @Override
-    public Iterable<Float> results() {
-      return ImmutableList.of(max);
-    }
-  }
-
-  private static class MaxDoubles extends SimpleAggregator<Double> {
-    private Double max = null;
-
-    @Override
-    public void reset() {
-      max = null;
-    }
-
-    @Override
-    public void update(Double next) {
-      if (max == null || max < next) {
-        max = next;
-      }
-    }
-
-    @Override
-    public Iterable<Double> results() {
-      return ImmutableList.of(max);
-    }
-  }
-
-  private static class MaxBigInts extends SimpleAggregator<BigInteger> {
-    private BigInteger max = null;
-
-    @Override
-    public void reset() {
-      max = null;
-    }
-
-    @Override
-    public void update(BigInteger next) {
-      if (max == null || max.compareTo(next) < 0) {
-        max = next;
-      }
-    }
-
-    @Override
-    public Iterable<BigInteger> results() {
-      return ImmutableList.of(max);
-    }
-  }
-
-  private static class MinLongs extends SimpleAggregator<Long> {
-    private Long min = null;
-
-    @Override
-    public void reset() {
-      min = null;
-    }
-
-    @Override
-    public void update(Long next) {
-      if (min == null || min > next) {
-        min = next;
-      }
-    }
-
-    @Override
-    public Iterable<Long> results() {
-      return ImmutableList.of(min);
-    }
-  }
-
-  private static class MinInts extends SimpleAggregator<Integer> {
-    private Integer min = null;
-
-    @Override
-    public void reset() {
-      min = null;
-    }
-
-    @Override
-    public void update(Integer next) {
-      if (min == null || min > next) {
-        min = next;
-      }
-    }
-
-    @Override
-    public Iterable<Integer> results() {
-      return ImmutableList.of(min);
-    }
-  }
-
-  private static class MinFloats extends SimpleAggregator<Float> {
-    private Float min = null;
-
-    @Override
-    public void reset() {
-      min = null;
-    }
-
-    @Override
-    public void update(Float next) {
-      if (min == null || min > next) {
-        min = next;
-      }
-    }
-
-    @Override
-    public Iterable<Float> results() {
-      return ImmutableList.of(min);
-    }
-  }
-
-  private static class MinDoubles extends SimpleAggregator<Double> {
-    private Double min = null;
-
-    @Override
-    public void reset() {
-      min = null;
-    }
-
-    @Override
-    public void update(Double next) {
-      if (min == null || min > next) {
-        min = next;
-      }
-    }
-
-    @Override
-    public Iterable<Double> results() {
-      return ImmutableList.of(min);
-    }
-  }
-
-  private static class MinBigInts extends SimpleAggregator<BigInteger> {
-    private BigInteger min = null;
-
-    @Override
-    public void reset() {
-      min = null;
-    }
-
-    @Override
-    public void update(BigInteger next) {
-      if (min == null || min.compareTo(next) > 0) {
-        min = next;
-      }
-    }
-
-    @Override
-    public Iterable<BigInteger> results() {
-      return ImmutableList.of(min);
-    }
-  }
-
-  private static class MaxNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
-    private final int arity;
-    private transient SortedSet<V> elements;
-
-    public MaxNAggregator(int arity) {
-      this.arity = arity;
-    }
-
-    @Override
-    public void reset() {
-      if (elements == null) {
-        elements = Sets.newTreeSet();
-      } else {
-        elements.clear();
-      }
-    }
-
-    @Override
-    public void update(V value) {
-      if (elements.size() < arity) {
-        elements.add(value);
-      } else if (value.compareTo(elements.first()) > 0) {
-        elements.remove(elements.first());
-        elements.add(value);
-      }
-    }
-
-    @Override
-    public Iterable<V> results() {
-      return ImmutableList.copyOf(elements);
-    }
-  }
-
-  private static class MinNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
-    private final int arity;
-    private transient SortedSet<V> elements;
-
-    public MinNAggregator(int arity) {
-      this.arity = arity;
-    }
-
-    @Override
-    public void reset() {
-      if (elements == null) {
-        elements = Sets.newTreeSet();
-      } else {
-        elements.clear();
-      }
-    }
-
-    @Override
-    public void update(V value) {
-      if (elements.size() < arity) {
-        elements.add(value);
-      } else if (value.compareTo(elements.last()) < 0) {
-        elements.remove(elements.last());
-        elements.add(value);
-      }
-    }
-
-    @Override
-    public Iterable<V> results() {
-      return ImmutableList.copyOf(elements);
-    }
-  }
-
-  private static class FirstNAggregator<V> extends SimpleAggregator<V> {
-    private final int arity;
-    private final List<V> elements;
-
-    public FirstNAggregator(int arity) {
-      this.arity = arity;
-      this.elements = Lists.newArrayList();
-    }
-
-    @Override
-    public void reset() {
-      elements.clear();
-    }
-
-    @Override
-    public void update(V value) {
-      if (elements.size() < arity) {
-        elements.add(value);
-      }
-    }
-
-    @Override
-    public Iterable<V> results() {
-      return ImmutableList.copyOf(elements);
-    }
-  }
-
-  private static class LastNAggregator<V> extends SimpleAggregator<V> {
-    private final int arity;
-    private final LinkedList<V> elements;
-
-    public LastNAggregator(int arity) {
-      this.arity = arity;
-      this.elements = Lists.newLinkedList();
-    }
-
-    @Override
-    public void reset() {
-      elements.clear();
-    }
-
-    @Override
-    public void update(V value) {
-      elements.add(value);
-      if (elements.size() == arity + 1) {
-        elements.removeFirst();
-      }
-    }
-
-    @Override
-    public Iterable<V> results() {
-      return ImmutableList.copyOf(elements);
-    }
-  }
-
-  private static class StringConcatAggregator extends SimpleAggregator<String> {
-    private final String separator;
-    private final boolean skipNulls;
-    private final long maxOutputLength;
-    private final long maxInputLength;
-    private long currentLength;
-    private final LinkedList<String> list = new LinkedList<String>();
-
-    private transient Joiner joiner;
-
-    public StringConcatAggregator(final String separator, final boolean skipNulls) {
-      this.separator = separator;
-      this.skipNulls = skipNulls;
-      this.maxInputLength = 0;
-      this.maxOutputLength = 0;
-    }
-
-    public StringConcatAggregator(final String separator, final boolean skipNull, final long maxOutputLength, final long maxInputLength) {
-      this.separator = separator;
-      this.skipNulls = skipNull;
-      this.maxOutputLength = maxOutputLength;
-      this.maxInputLength = maxInputLength;
-      this.currentLength = -separator.length();
-    }
-
-    @Override
-    public void reset() {
-      if (joiner == null) {
-        joiner = skipNulls ? Joiner.on(separator).skipNulls() : Joiner.on(separator);
-      }
-      currentLength = -separator.length();
-      list.clear();
-    }
-
-    @Override
-    public void update(final String next) {
-      long length = (next == null) ? 0 : next.length() + separator.length();
-      if (maxOutputLength > 0 && currentLength + length > maxOutputLength || maxInputLength > 0 && next.length() > maxInputLength) {
-        return;
-      }
-      if (maxOutputLength > 0) {
-        currentLength += length;
-      }
-      list.add(next);
-    }
-
-    @Override
-    public Iterable<String> results() {
-      return ImmutableList.of(joiner.join(list));
-    }
-  }
-
-
-  private static abstract class TupleAggregator<T> implements Aggregator<T> {
-    private final List<Aggregator<Object>> aggregators;
-
-    @SuppressWarnings("unchecked")
-    public TupleAggregator(Aggregator<?>... aggregators) {
-      this.aggregators = Lists.newArrayList();
-      for (Aggregator<?> a : aggregators) {
-        this.aggregators.add((Aggregator<Object>) a);
-      }
-    }
-
-    @Override
-    public void initialize(Configuration configuration) {
-      for (Aggregator<?> a : aggregators) {
-        a.initialize(configuration);
-      }
-    }
-
-    @Override
-    public void reset() {
-      for (Aggregator<?> a : aggregators) {
-        a.reset();
-      }
-    }
-
-    protected void updateTuple(Tuple t) {
-      for (int i = 0; i < aggregators.size(); i++) {
-        aggregators.get(i).update(t.get(i));
-      }
-    }
-
-    protected Iterable<Object> results(int index) {
-      return aggregators.get(index).results();
-    }
-  }
-
-  private static class PairAggregator<V1, V2> extends TupleAggregator<Pair<V1, V2>> {
-
-    public PairAggregator(Aggregator<V1> a1, Aggregator<V2> a2) {
-      super(a1, a2);
-    }
-
-    @Override
-    public void update(Pair<V1, V2> value) {
-      updateTuple(value);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public Iterable<Pair<V1, V2>> results() {
-      return new Tuples.PairIterable<V1, V2>((Iterable<V1>) results(0), (Iterable<V2>) results(1));
-    }
-  }
-
-  private static class TripAggregator<A, B, C> extends TupleAggregator<Tuple3<A, B, C>> {
-
-    public TripAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3) {
-      super(a1, a2, a3);
-    }
-
-    @Override
-    public void update(Tuple3<A, B, C> value) {
-      updateTuple(value);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public Iterable<Tuple3<A, B, C>> results() {
-      return new Tuples.TripIterable<A, B, C>((Iterable<A>) results(0), (Iterable<B>) results(1),
-          (Iterable<C>) results(2));
-    }
-  }
-
-  private static class QuadAggregator<A, B, C, D> extends TupleAggregator<Tuple4<A, B, C, D>> {
-
-    public QuadAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3, Aggregator<D> a4) {
-      super(a1, a2, a3, a4);
-    }
-
-    @Override
-    public void update(Tuple4<A, B, C, D> value) {
-      updateTuple(value);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public Iterable<Tuple4<A, B, C, D>> results() {
-      return new Tuples.QuadIterable<A, B, C, D>((Iterable<A>) results(0), (Iterable<B>) results(1),
-          (Iterable<C>) results(2), (Iterable<D>) results(3));
-    }
-  }
-
-  private static class TupleNAggregator extends TupleAggregator<TupleN> {
-    private final int size;
-
-    public TupleNAggregator(Aggregator<?>... aggregators) {
-      super(aggregators);
-      size = aggregators.length;
-    }
-
-    @Override
-    public void update(TupleN value) {
-      updateTuple(value);
-    }
-
-    @Override
-    public Iterable<TupleN> results() {
-      Iterable<?>[] iterables = new Iterable[size];
-      for (int i = 0; i < size; i++) {
-        iterables[i] = results(i);
-      }
-      return new Tuples.TupleNIterable(iterables);
-    }
-  }
-
-  private static class SetAggregator<V> extends SimpleAggregator<V> {
-    private final Set<V> elements;
-    private final int sizeLimit;
-    
-    public SetAggregator() {
-      this(-1);
-    }
-    
-    public SetAggregator(int sizeLimit) {
-      this.elements = Sets.newHashSet();
-      this.sizeLimit = sizeLimit;
-    }
-    
-    @Override
-    public void reset() {
-      elements.clear();
-    }
-
-    @Override
-    public void update(V value) {
-      if (sizeLimit == -1 || elements.size() < sizeLimit) {
-        elements.add(value);
-      }
-    }
-
-    @Override
-    public Iterable<V> results() {
-      return ImmutableList.copyOf(elements);
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java b/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
deleted file mode 100644
index 2a8e7d9..0000000
--- a/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import org.apache.crunch.Emitter;
-import org.apache.crunch.MapFn;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-public class CompositeMapFn<R, S, T> extends MapFn<R, T> {
-
-  private final MapFn<R, S> first;
-  private final MapFn<S, T> second;
-
-  public CompositeMapFn(MapFn<R, S> first, MapFn<S, T> second) {
-    this.first = first;
-    this.second = second;
-  }
-
-  @Override
-  public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
-    first.setContext(context);
-    second.setContext(context);
-  }
-  
-  @Override
-  public void initialize() {
-    first.initialize();
-    second.initialize();
-  }
-
-  public MapFn<R, S> getFirst() {
-    return first;
-  }
-
-  public MapFn<S, T> getSecond() {
-    return second;
-  }
-
-  @Override
-  public T map(R input) {
-    return second.map(first.map(input));
-  }
-
-  @Override
-  public void cleanup(Emitter<T> emitter) {
-    first.cleanup(null);
-    second.cleanup(null);
-  }
-
-  @Override
-  public void configure(Configuration conf) {
-    first.configure(conf);
-    second.configure(conf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java b/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
deleted file mode 100644
index b8cc9df..0000000
--- a/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-/**
- * Wrapper function for converting a {@code MapFn} into a key-value pair that is
- * used to convert from a {@code PCollection<V>} to a {@code PTable<K, V>}.
- */
-public class ExtractKeyFn<K, V> extends MapFn<V, Pair<K, V>> {
-
-  private final MapFn<V, K> mapFn;
-
-  public ExtractKeyFn(MapFn<V, K> mapFn) {
-    this.mapFn = mapFn;
-  }
-
-  @Override
-  public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
-    mapFn.setContext(context);
-  }
-  
-  @Override
-  public void initialize() {
-    mapFn.initialize();
-  }
-
-  @Override
-  public Pair<K, V> map(V input) {
-    return Pair.of(mapFn.map(input), input);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/fn/FilterFns.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/FilterFns.java b/crunch/src/main/java/org/apache/crunch/fn/FilterFns.java
deleted file mode 100644
index 8dc4268..0000000
--- a/crunch/src/main/java/org/apache/crunch/fn/FilterFns.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import org.apache.crunch.FilterFn;
-import org.apache.crunch.FilterFn.AndFn;
-import org.apache.crunch.FilterFn.NotFn;
-import org.apache.crunch.FilterFn.OrFn;
-
-
-/**
- * A collection of pre-defined {@link FilterFn} implementations.
- */
-public final class FilterFns {
-  // Note: We delegate to the deprecated implementation classes in FilterFn. When their
-  //       time is up, we just move them here.
-
-  private FilterFns() {
-    // utility class, not for instantiation
-  }
-
-  /**
-   * Accept an entry if all of the given filters accept it, using short-circuit evaluation.
-   * @param fn1 The first functions to delegate to
-   * @param fn2 The second functions to delegate to
-   * @return The composed filter function
-   */
-  public static <S> FilterFn<S> and(FilterFn<S> fn1, FilterFn<S> fn2) {
-    return new AndFn<S>(fn1, fn2);
-  }
-
-  /**
-   * Accept an entry if all of the given filters accept it, using short-circuit evaluation.
-   * @param fns The functions to delegate to (in the given order)
-   * @return The composed filter function
-   */
-  public static <S> FilterFn<S> and(FilterFn<S>... fns) {
-    return new AndFn<S>(fns);
-  }
-
-  /**
-   * Accept an entry if at least one of the given filters accept it, using short-circuit evaluation.
-   * @param fn1 The first functions to delegate to
-   * @param fn2 The second functions to delegate to
-   * @return The composed filter function
-   */
-  public static <S> FilterFn<S> or(FilterFn<S> fn1, FilterFn<S> fn2) {
-    return new OrFn<S>(fn1, fn2);
-  }
-
-  /**
-   * Accept an entry if at least one of the given filters accept it, using short-circuit evaluation.
-   * @param fns The functions to delegate to (in the given order)
-   * @return The composed filter function
-   */
-  public static <S> FilterFn<S> or(FilterFn<S>... fns) {
-    return new OrFn<S>(fns);
-  }
-
-  /**
-   * Accept an entry if the given filter <em>does not</em> accept it.
-   * @param fn The function to delegate to
-   * @return The composed filter function
-   */
-  public static <S> FilterFn<S> not(FilterFn<S> fn) {
-    return new NotFn<S>(fn);
-  }
-
-  /**
-   * Accept everything.
-   * @return A filter function that accepts everything.
-   */
-  public static <S> FilterFn<S> ACCEPT_ALL() {
-    return new AcceptAllFn<S>();
-  }
-
-  /**
-   * Reject everything.
-   * @return A filter function that rejects everything.
-   */
-  public static <S> FilterFn<S> REJECT_ALL() {
-    return not(new AcceptAllFn<S>());
-  }
-
-  private static class AcceptAllFn<S> extends FilterFn<S> {
-    @Override
-    public boolean accept(S input) {
-      return true;
-    }
-
-    @Override
-    public float scaleFactor() {
-      return 1.0f;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/fn/IdentityFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/IdentityFn.java b/crunch/src/main/java/org/apache/crunch/fn/IdentityFn.java
deleted file mode 100644
index 0eadb06..0000000
--- a/crunch/src/main/java/org/apache/crunch/fn/IdentityFn.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import org.apache.crunch.MapFn;
-
-public class IdentityFn<T> extends MapFn<T, T> {
-
-  private static final IdentityFn<Object> INSTANCE = new IdentityFn<Object>();
-
-  @SuppressWarnings("unchecked")
-  public static <T> IdentityFn<T> getInstance() {
-    return (IdentityFn<T>) INSTANCE;
-  }
-
-  // Non-instantiable
-  private IdentityFn() {
-  }
-
-  @Override
-  public T map(T input) {
-    return input;
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/fn/MapKeysFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/MapKeysFn.java b/crunch/src/main/java/org/apache/crunch/fn/MapKeysFn.java
deleted file mode 100644
index cbaf24d..0000000
--- a/crunch/src/main/java/org/apache/crunch/fn/MapKeysFn.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.Pair;
-
-public abstract class MapKeysFn<K1, K2, V> extends DoFn<Pair<K1, V>, Pair<K2, V>> {
-
-  @Override
-  public void process(Pair<K1, V> input, Emitter<Pair<K2, V>> emitter) {
-    emitter.emit(Pair.of(map(input.first()), input.second()));
-  }
-
-  public abstract K2 map(K1 k1);
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/fn/MapValuesFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/MapValuesFn.java b/crunch/src/main/java/org/apache/crunch/fn/MapValuesFn.java
deleted file mode 100644
index b90f5ff..0000000
--- a/crunch/src/main/java/org/apache/crunch/fn/MapValuesFn.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.Pair;
-
-public abstract class MapValuesFn<K, V1, V2> extends DoFn<Pair<K, V1>, Pair<K, V2>> {
-
-  @Override
-  public void process(Pair<K, V1> input, Emitter<Pair<K, V2>> emitter) {
-    emitter.emit(Pair.of(input.first(), map(input.second())));
-  }
-
-  public abstract V2 map(V1 v);
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java b/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
deleted file mode 100644
index 9ee4336..0000000
--- a/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import org.apache.crunch.Emitter;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-public class PairMapFn<K, V, S, T> extends MapFn<Pair<K, V>, Pair<S, T>> {
-
-  private MapFn<K, S> keys;
-  private MapFn<V, T> values;
-
-  public PairMapFn(MapFn<K, S> keys, MapFn<V, T> values) {
-    this.keys = keys;
-    this.values = values;
-  }
-
-  @Override
-  public void configure(Configuration conf) {
-    keys.configure(conf);
-    values.configure(conf);
-  }
-
-  @Override
-  public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
-    keys.setContext(context);
-    values.setContext(context);
-  }
-
-  @Override
-  public void initialize() {
-    keys.initialize();
-    values.initialize();
-  }
-  
-  @Override
-  public Pair<S, T> map(Pair<K, V> input) {
-    return Pair.of(keys.map(input.first()), values.map(input.second()));
-  }
-
-  @Override
-  public void cleanup(Emitter<Pair<S, T>> emitter) {
-    keys.cleanup(null);
-    values.cleanup(null);
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/fn/package-info.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/package-info.java b/crunch/src/main/java/org/apache/crunch/fn/package-info.java
deleted file mode 100644
index acefdff..0000000
--- a/crunch/src/main/java/org/apache/crunch/fn/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Commonly used functions for manipulating collections.
- */
-package org.apache.crunch.fn;

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java
deleted file mode 100644
index 887c051..0000000
--- a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.hadoop.mapreduce;
-
-import java.lang.reflect.Constructor;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-
-/**
- * A factory class that allows us to hide the fact that {@code TaskAttemptContext} is a class in
- * Hadoop 1.x.x and an interface in Hadoop 2.x.x.
- */
-@SuppressWarnings("unchecked")
-public class TaskAttemptContextFactory {
-
-  private static final Log LOG = LogFactory.getLog(TaskAttemptContextFactory.class);
-
-  private static final TaskAttemptContextFactory INSTANCE = new TaskAttemptContextFactory();
-
-  public static TaskAttemptContext create(Configuration conf, TaskAttemptID taskAttemptId) {
-    return INSTANCE.createInternal(conf, taskAttemptId);
-  }
-
-  private Constructor<TaskAttemptContext> taskAttemptConstructor;
-
-  private TaskAttemptContextFactory() {
-    Class<TaskAttemptContext> implClass = TaskAttemptContext.class;
-    if (implClass.isInterface()) {
-      try {
-        implClass = (Class<TaskAttemptContext>) Class.forName(
-            "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
-      } catch (ClassNotFoundException e) {
-        LOG.fatal("Could not find TaskAttemptContextImpl class, exiting", e);
-      }
-    }
-    try {
-      this.taskAttemptConstructor = implClass.getConstructor(Configuration.class, TaskAttemptID.class);
-    } catch (Exception e) {
-      LOG.fatal("Could not access TaskAttemptContext constructor, exiting", e);
-    }
-  }
-
-  private TaskAttemptContext createInternal(Configuration conf, TaskAttemptID taskAttemptId) {
-    try {
-      return (TaskAttemptContext) taskAttemptConstructor.newInstance(conf, taskAttemptId);
-    } catch (Exception e) {
-      LOG.error("Could not construct a TaskAttemptContext instance", e);
-      return null;
-    }
-  }
-}