You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:17 UTC

[15/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/resources/urls.txt
----------------------------------------------------------------------
diff --git a/crunch/src/it/resources/urls.txt b/crunch/src/it/resources/urls.txt
deleted file mode 100644
index 827e711..0000000
--- a/crunch/src/it/resources/urls.txt
+++ /dev/null
@@ -1,11 +0,0 @@
-www.A.com	www.B.com
-www.A.com	www.C.com
-www.A.com	www.D.com
-www.A.com	www.E.com
-www.B.com	www.D.com
-www.B.com	www.E.com
-www.C.com	www.D.com
-www.D.com	www.B.com
-www.E.com	www.A.com
-www.F.com	www.B.com
-www.F.com	www.C.com

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/Aggregator.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Aggregator.java b/crunch/src/main/java/org/apache/crunch/Aggregator.java
deleted file mode 100644
index 432452b..0000000
--- a/crunch/src/main/java/org/apache/crunch/Aggregator.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.io.Serializable;
-
-import org.apache.hadoop.conf.Configuration;
-
-
-/**
- * Aggregate a sequence of values into a possibly smaller sequence of the same type.
- *
- * <p>In most cases, an Aggregator will turn multiple values into a single value,
- * like creating a sum, finding the minimum or maximum, etc. In some cases
- * (ie. finding the top K elements), an implementation may return more than
- * one value. The {@link org.apache.crunch.fn.Aggregators} utility class contains
- * factory methods for creating all kinds of pre-defined Aggregators that should
- * cover the most common cases.</p>
- *
- * <p>Aggregator implementations should usually be <em>associative</em> and
- * <em>commutative</em>, which makes their results deterministic. If your aggregation
- * function isn't commutative, you can still use secondary sort to that effect.</p>
- *
- * <p>The lifecycle of an {@link Aggregator} always begins with you instantiating
- * it and passing it to Crunch. When running your {@link Pipeline}, Crunch serializes
- * the instance and deserializes it wherever it is needed on the cluster. This is how
- * Crunch uses a deserialized instance:<p>
- *
- * <ol>
- *   <li>call {@link #initialize(Configuration)} once</li>
- *   <li>call {@link #reset()}
- *   <li>call {@link #update(Object)} multiple times until all values of a sequence
- *       have been aggregated</li>
- *   <li>call {@link #results()} to retrieve the aggregated result</li>
- *   <li>go back to step 2 until all sequences have been aggregated</li>
- * </ol>
- *
- * @param <T> The value types to aggregate
- */
-public interface Aggregator<T> extends Serializable {
-
-  /**
-   * Perform any setup of this instance that is required prior to processing
-   * inputs.
-   *
-   * @param conf Hadoop configuration
-   */
-  void initialize(Configuration conf);
-
-  /**
-   * Clears the internal state of this Aggregator and prepares it for the
-   * values associated with the next key.
-   *
-   * Depending on what you aggregate, this typically means setting a variable
-   * to zero or clearing a list. Failing to do this will yield wrong results!
-   */
-  void reset();
-
-  /**
-   * Incorporate the given value into the aggregate state maintained by this
-   * instance.
-   *
-   * @param value The value to add to the aggregated state
-   */
-  void update(T value);
-
-  /**
-   * Returns the current aggregated state of this instance.
-   */
-  Iterable<T> results();
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/CombineFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/CombineFn.java b/crunch/src/main/java/org/apache/crunch/CombineFn.java
deleted file mode 100644
index 71e8057..0000000
--- a/crunch/src/main/java/org/apache/crunch/CombineFn.java
+++ /dev/null
@@ -1,1211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.io.Serializable;
-import java.math.BigInteger;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.SortedSet;
-
-import org.apache.crunch.fn.Aggregators;
-import org.apache.crunch.util.Tuples;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * A special {@link DoFn} implementation that converts an {@link Iterable} of
- * values into a single value. If a {@code CombineFn} instance is used on a
- * {@link PGroupedTable}, the function will be applied to the output of the map
- * stage before the data is passed to the reducer, which can improve the runtime
- * of certain classes of jobs.
- * <p>
- * Note that the incoming {@code Iterable} can only be used to create an 
- * {@code Iterator} once. Calling {@link Iterable#iterator()} method a second
- * time will throw an {@link IllegalStateException}.
- */
-public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, T>> {
-
-  /**
-   * @deprecated Use {@link org.apache.crunch.Aggregator}
-   */
-  public static interface Aggregator<T> extends Serializable {
-    /**
-     * Perform any setup of this instance that is required prior to processing
-     * inputs.
-     */
-    void initialize(Configuration configuration);
-
-    /**
-     * Clears the internal state of this Aggregator and prepares it for the
-     * values associated with the next key.
-     */
-    void reset();
-
-    /**
-     * Incorporate the given value into the aggregate state maintained by this
-     * instance.
-     */
-    void update(T value);
-
-    /**
-     * Returns the current aggregated state of this instance.
-     */
-    Iterable<T> results();
-  }
-
-  /**
-   * Base class for aggregators that do not require any initialization.
-   *
-   * @deprecated Use {@link org.apache.crunch.fn.Aggregators.SimpleAggregator}
-   */
-  public static abstract class SimpleAggregator<T> implements Aggregator<T> {
-    @Override
-    public void initialize(Configuration conf) {
-      // No-op
-    }
-  }
-  
-  /**
-   * Interface for constructing new aggregator instances.
-   *
-   * @deprecated Use {@link PGroupedTable#combineValues(Aggregator)} which doesn't require a factory.
-   */
-  public static interface AggregatorFactory<T> {
-    Aggregator<T> create();
-  }
-
-  /**
-   * A {@code CombineFn} that delegates all of the actual work to an
-   * {@code Aggregator} instance.
-   *
-   * @deprecated Use the {@link Aggregators#toCombineFn(org.apache.crunch.Aggregator)} adapter
-   */
-  public static class AggregatorCombineFn<K, V> extends CombineFn<K, V> {
-
-    private final Aggregator<V> aggregator;
-
-    public AggregatorCombineFn(Aggregator<V> aggregator) {
-      this.aggregator = aggregator;
-    }
-
-    @Override
-    public void initialize() {
-      aggregator.initialize(getConfiguration());
-    }
-    
-    @Override
-    public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) {
-      aggregator.reset();
-      for (V v : input.second()) {
-        aggregator.update(v);
-      }
-      for (V v : aggregator.results()) {
-        emitter.emit(Pair.of(input.first(), v));
-      }
-    }
-  }
-
-  private static abstract class TupleAggregator<T> implements Aggregator<T> {
-    private final List<Aggregator<Object>> aggregators;
-
-    public TupleAggregator(Aggregator<?>... aggregators) {
-      this.aggregators = Lists.newArrayList();
-      for (Aggregator<?> a : aggregators) {
-        this.aggregators.add((Aggregator<Object>) a);
-      }
-    }
-
-    @Override
-    public void initialize(Configuration configuration) {
-      for (Aggregator<?> a : aggregators) {
-        a.initialize(configuration);
-      }
-    }
-    
-    @Override
-    public void reset() {
-      for (Aggregator<?> a : aggregators) {
-        a.reset();
-      }
-    }
-
-    protected void updateTuple(Tuple t) {
-      for (int i = 0; i < aggregators.size(); i++) {
-        aggregators.get(i).update(t.get(i));
-      }
-    }
-
-    protected Iterable<Object> results(int index) {
-      return aggregators.get(index).results();
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#pairAggregator(Aggregator, Aggregator)}
-   */
-  public static class PairAggregator<V1, V2> extends TupleAggregator<Pair<V1, V2>> {
-
-    public PairAggregator(Aggregator<V1> a1, Aggregator<V2> a2) {
-      super(a1, a2);
-    }
-
-    @Override
-    public void update(Pair<V1, V2> value) {
-      updateTuple(value);
-    }
-
-    @Override
-    public Iterable<Pair<V1, V2>> results() {
-      return new Tuples.PairIterable<V1, V2>((Iterable<V1>) results(0), (Iterable<V2>) results(1));
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#tripAggregator(Aggregator, Aggregator, Aggregator)}
-   */
-  public static class TripAggregator<A, B, C> extends TupleAggregator<Tuple3<A, B, C>> {
-
-    public TripAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3) {
-      super(a1, a2, a3);
-    }
-
-    @Override
-    public void update(Tuple3<A, B, C> value) {
-      updateTuple(value);
-    }
-
-    @Override
-    public Iterable<Tuple3<A, B, C>> results() {
-      return new Tuples.TripIterable<A, B, C>((Iterable<A>) results(0), (Iterable<B>) results(1),
-          (Iterable<C>) results(2));
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#quadAggregator(Aggregator, Aggregator, Aggregator, Aggregator)}
-   */
-  public static class QuadAggregator<A, B, C, D> extends TupleAggregator<Tuple4<A, B, C, D>> {
-
-    public QuadAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3, Aggregator<D> a4) {
-      super(a1, a2, a3, a4);
-    }
-
-    @Override
-    public void update(Tuple4<A, B, C, D> value) {
-      updateTuple(value);
-    }
-
-    @Override
-    public Iterable<Tuple4<A, B, C, D>> results() {
-      return new Tuples.QuadIterable<A, B, C, D>((Iterable<A>) results(0), (Iterable<B>) results(1),
-          (Iterable<C>) results(2), (Iterable<D>) results(3));
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#tupleAggregator(Aggregator...)}
-   */
-  public static class TupleNAggregator extends TupleAggregator<TupleN> {
-
-    private final int size;
-
-    public TupleNAggregator(Aggregator<?>... aggregators) {
-      super(aggregators);
-      size = aggregators.length;
-    }
-
-    @Override
-    public void update(TupleN value) {
-      updateTuple(value);
-    }
-
-    @Override
-    public Iterable<TupleN> results() {
-      Iterable<?>[] iterables = new Iterable[size];
-      for (int i = 0; i < size; i++) {
-        iterables[i] = results(i);
-      }
-      return new Tuples.TupleNIterable(iterables);
-    }
-
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#toCombineFn(Aggregator)}
-   */
-  public static final <K, V> CombineFn<K, V> aggregator(Aggregator<V> aggregator) {
-    return new AggregatorCombineFn<K, V>(aggregator);
-  }
-
-  /**
-   * @deprecated Use {@link PGroupedTable#combineValues(Aggregator)} which doesn't require a factory.
-   */
-  public static final <K, V> CombineFn<K, V> aggregatorFactory(AggregatorFactory<V> aggregator) {
-    return new AggregatorCombineFn<K, V>(aggregator.create());
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#pairAggregator(Aggregator, Aggregator)}
-   */
-  public static final <K, V1, V2> CombineFn<K, Pair<V1, V2>> pairAggregator(AggregatorFactory<V1> a1,
-      AggregatorFactory<V2> a2) {
-    return aggregator(new PairAggregator<V1, V2>(a1.create(), a2.create()));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#tripAggregator(Aggregator, Aggregator, Aggregator)}
-   */
-  public static final <K, A, B, C> CombineFn<K, Tuple3<A, B, C>> tripAggregator(AggregatorFactory<A> a1,
-      AggregatorFactory<B> a2, AggregatorFactory<C> a3) {
-    return aggregator(new TripAggregator<A, B, C>(a1.create(), a2.create(), a3.create()));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#quadAggregator(Aggregator, Aggregator, Aggregator, Aggregator)}
-   */
-  public static final <K, A, B, C, D> CombineFn<K, Tuple4<A, B, C, D>> quadAggregator(AggregatorFactory<A> a1,
-      AggregatorFactory<B> a2, AggregatorFactory<C> a3, AggregatorFactory<D> a4) {
-    return aggregator(new QuadAggregator<A, B, C, D>(a1.create(), a2.create(), a3.create(), a4.create()));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#tupleAggregator(Aggregator...)}
-   */
-  public static final <K> CombineFn<K, TupleN> tupleAggregator(AggregatorFactory<?>... factories) {
-    Aggregator<?>[] aggs = new Aggregator[factories.length];
-    for (int i = 0; i < aggs.length; i++) {
-      aggs[i] = factories[i].create();
-    }
-    return aggregator(new TupleNAggregator(aggs));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_LONGS()}
-   */
-  public static final <K> CombineFn<K, Long> SUM_LONGS() {
-    return aggregatorFactory(SUM_LONGS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_INTS()}
-   */
-  public static final <K> CombineFn<K, Integer> SUM_INTS() {
-    return aggregatorFactory(SUM_INTS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_FLOATS()}
-   */
-  public static final <K> CombineFn<K, Float> SUM_FLOATS() {
-    return aggregatorFactory(SUM_FLOATS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_DOUBLES()}
-   */
-  public static final <K> CombineFn<K, Double> SUM_DOUBLES() {
-    return aggregatorFactory(SUM_DOUBLES);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_BIGINTS()}
-   */
-  public static final <K> CombineFn<K, BigInteger> SUM_BIGINTS() {
-    return aggregatorFactory(SUM_BIGINTS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_LONGS()}
-   */
-  public static final <K> CombineFn<K, Long> MAX_LONGS() {
-    return aggregatorFactory(MAX_LONGS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_LONGS(int)}
-   */
-  public static final <K> CombineFn<K, Long> MAX_LONGS(int n) {
-    return aggregator(new MaxNAggregator<Long>(n));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_INTS()}
-   */
-  public static final <K> CombineFn<K, Integer> MAX_INTS() {
-    return aggregatorFactory(MAX_INTS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_INTS(int)}
-   */
-  public static final <K> CombineFn<K, Integer> MAX_INTS(int n) {
-    return aggregator(new MaxNAggregator<Integer>(n));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_FLOATS()}
-   */
-  public static final <K> CombineFn<K, Float> MAX_FLOATS() {
-    return aggregatorFactory(MAX_FLOATS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_FLOATS(int)}
-   */
-  public static final <K> CombineFn<K, Float> MAX_FLOATS(int n) {
-    return aggregator(new MaxNAggregator<Float>(n));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_DOUBLES()}
-   */
-  public static final <K> CombineFn<K, Double> MAX_DOUBLES() {
-    return aggregatorFactory(MAX_DOUBLES);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_DOUBLES(int)}
-   */
-  public static final <K> CombineFn<K, Double> MAX_DOUBLES(int n) {
-    return aggregator(new MaxNAggregator<Double>(n));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_BIGINTS()}
-   */
-  public static final <K> CombineFn<K, BigInteger> MAX_BIGINTS() {
-    return aggregatorFactory(MAX_BIGINTS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_BIGINTS(int)}
-   */
-  public static final <K> CombineFn<K, BigInteger> MAX_BIGINTS(int n) {
-    return aggregator(new MaxNAggregator<BigInteger>(n));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_LONGS()}
-   */
-  public static final <K> CombineFn<K, Long> MIN_LONGS() {
-    return aggregatorFactory(MIN_LONGS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_LONGS(int)}
-   */
-  public static final <K> CombineFn<K, Long> MIN_LONGS(int n) {
-    return aggregator(new MinNAggregator<Long>(n));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_INTS()}
-   */
-  public static final <K> CombineFn<K, Integer> MIN_INTS() {
-    return aggregatorFactory(MIN_INTS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_INTS(int)}
-   */
-  public static final <K> CombineFn<K, Integer> MIN_INTS(int n) {
-    return aggregator(new MinNAggregator<Integer>(n));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_FLOATS()}
-   */
-  public static final <K> CombineFn<K, Float> MIN_FLOATS() {
-    return aggregatorFactory(MIN_FLOATS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_FLOATS(int)}
-   */
-  public static final <K> CombineFn<K, Float> MIN_FLOATS(int n) {
-    return aggregator(new MinNAggregator<Float>(n));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_DOUBLES()}
-   */
-  public static final <K> CombineFn<K, Double> MIN_DOUBLES() {
-    return aggregatorFactory(MIN_DOUBLES);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_DOUBLES(int)}
-   */
-  public static final <K> CombineFn<K, Double> MIN_DOUBLES(int n) {
-    return aggregator(new MinNAggregator<Double>(n));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_BIGINTS()}
-   */
-  public static final <K> CombineFn<K, BigInteger> MIN_BIGINTS() {
-    return aggregatorFactory(MIN_BIGINTS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_BIGINTS(int)}
-   */
-  public static final <K> CombineFn<K, BigInteger> MIN_BIGINTS(int n) {
-    return aggregator(new MinNAggregator<BigInteger>(n));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#FIRST_N(int)}
-   */
-  public static final <K, V> CombineFn<K, V> FIRST_N(int n) {
-    return aggregator(new FirstNAggregator<V>(n));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#LAST_N(int)}
-   */
-  public static final <K, V> CombineFn<K, V> LAST_N(int n) {
-    return aggregator(new LastNAggregator<V>(n));
-  }
-
-  /**
-   * Used to concatenate strings, with a separator between each strings. There
-   * is no limits of length for the concatenated string.
-   * 
-   * @param separator
-   *            the separator which will be appended between each string
-   * @param skipNull
-   *            define if we should skip null values. Throw
-   *            NullPointerException if set to false and there is a null
-   *            value.
-   * @return
-   *
-   * @deprecated Use {@link Aggregators#STRING_CONCAT(String, boolean)}
-   */
-  public static final <K> CombineFn<K, String> STRING_CONCAT(final String separator, final boolean skipNull) {
-      return aggregator(new StringConcatAggregator(separator, skipNull));
-  }
-
-  /**
-   * Used to concatenate strings, with a separator between each strings. You
-   * can specify the maximum length of the output string and of the input
-   * strings, if they are > 0. If a value is <= 0, there is no limits.
-   * 
-   * Any too large string (or any string which would made the output too
-   * large) will be silently discarded.
-   * 
-   * @param separator
-   *            the separator which will be appended between each string
-   * @param skipNull
-   *            define if we should skip null values. Throw
-   *            NullPointerException if set to false and there is a null
-   *            value.
-   * @param maxOutputLength
-   *            the maximum length of the output string. If it's set <= 0,
-   *            there is no limits. The number of characters of the output
-   *            string will be < maxOutputLength.
-   * @param maxInputLength
-   *            the maximum length of the input strings. If it's set <= 0,
-   *            there is no limits. The number of characters of the int string
-   *            will be < maxInputLength to be concatenated.
-   * @return
-   *
-   * @deprecated Use {@link Aggregators#STRING_CONCAT(String, boolean, long, long)}
-   */
-  public static final <K> CombineFn<K, String> STRING_CONCAT(final String separator, final boolean skipNull, final long maxOutputLength, final long maxInputLength) {
-      return aggregator(new StringConcatAggregator(separator, skipNull, maxOutputLength, maxInputLength));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_LONGS()}
-   */
-  public static class SumLongs extends SimpleAggregator<Long> {
-    private long sum = 0;
-
-    @Override
-    public void reset() {
-      sum = 0;
-    }
-
-    @Override
-    public void update(Long next) {
-      sum += next;
-    }
-
-    @Override
-    public Iterable<Long> results() {
-      return ImmutableList.of(sum);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_LONGS()}
-   */
-  public static AggregatorFactory<Long> SUM_LONGS = new AggregatorFactory<Long>() {
-    public Aggregator<Long> create() {
-      return new SumLongs();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_INTS()}
-   */
-  public static class SumInts extends SimpleAggregator<Integer> {
-    private int sum = 0;
-
-    @Override
-    public void reset() {
-      sum = 0;
-    }
-
-    @Override
-    public void update(Integer next) {
-      sum += next;
-    }
-
-    @Override
-    public Iterable<Integer> results() {
-      return ImmutableList.of(sum);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_INTS()}
-   */
-  public static AggregatorFactory<Integer> SUM_INTS = new AggregatorFactory<Integer>() {
-    public Aggregator<Integer> create() {
-      return new SumInts();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_FLOATS()}
-   */
-  public static class SumFloats extends SimpleAggregator<Float> {
-    private float sum = 0;
-
-    @Override
-    public void reset() {
-      sum = 0f;
-    }
-
-    @Override
-    public void update(Float next) {
-      sum += next;
-    }
-
-    @Override
-    public Iterable<Float> results() {
-      return ImmutableList.of(sum);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_FLOATS()}
-   */
-  public static AggregatorFactory<Float> SUM_FLOATS = new AggregatorFactory<Float>() {
-    public Aggregator<Float> create() {
-      return new SumFloats();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_DOUBLES()}
-   */
-  public static class SumDoubles extends SimpleAggregator<Double> {
-    private double sum = 0;
-
-    @Override
-    public void reset() {
-      sum = 0f;
-    }
-
-    @Override
-    public void update(Double next) {
-      sum += next;
-    }
-
-    @Override
-    public Iterable<Double> results() {
-      return ImmutableList.of(sum);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_DOUBLES()}
-   */
-  public static AggregatorFactory<Double> SUM_DOUBLES = new AggregatorFactory<Double>() {
-    public Aggregator<Double> create() {
-      return new SumDoubles();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_BIGINTS()}
-   */
-  public static class SumBigInts extends SimpleAggregator<BigInteger> {
-    private BigInteger sum = BigInteger.ZERO;
-
-    @Override
-    public void reset() {
-      sum = BigInteger.ZERO;
-    }
-
-    @Override
-    public void update(BigInteger next) {
-      sum = sum.add(next);
-    }
-
-    @Override
-    public Iterable<BigInteger> results() {
-      return ImmutableList.of(sum);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_BIGINTS()}
-   */
-  public static AggregatorFactory<BigInteger> SUM_BIGINTS = new AggregatorFactory<BigInteger>() {
-    public Aggregator<BigInteger> create() {
-      return new SumBigInts();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_LONGS()}
-   */
-  public static class MaxLongs extends SimpleAggregator<Long> {
-    private Long max = null;
-
-    @Override
-    public void reset() {
-      max = null;
-    }
-
-    @Override
-    public void update(Long next) {
-      if (max == null || max < next) {
-        max = next;
-      }
-    }
-
-    @Override
-    public Iterable<Long> results() {
-      return ImmutableList.of(max);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_LONGS()}
-   */
-  public static AggregatorFactory<Long> MAX_LONGS = new AggregatorFactory<Long>() {
-    public Aggregator<Long> create() {
-      return new MaxLongs();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_INTS()}
-   */
-  public static class MaxInts extends SimpleAggregator<Integer> {
-    private Integer max = null;
-
-    @Override
-    public void reset() {
-      max = null;
-    }
-
-    @Override
-    public void update(Integer next) {
-      if (max == null || max < next) {
-        max = next;
-      }
-    }
-
-    @Override
-    public Iterable<Integer> results() {
-      return ImmutableList.of(max);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_INTS()}
-   */
-  public static AggregatorFactory<Integer> MAX_INTS = new AggregatorFactory<Integer>() {
-    public Aggregator<Integer> create() {
-      return new MaxInts();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_FLOATS()}
-   */
-  public static class MaxFloats extends SimpleAggregator<Float> {
-    private Float max = null;
-
-    @Override
-    public void reset() {
-      max = null;
-    }
-
-    @Override
-    public void update(Float next) {
-      if (max == null || max < next) {
-        max = next;
-      }
-    }
-
-    @Override
-    public Iterable<Float> results() {
-      return ImmutableList.of(max);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_FLOATS()}
-   */
-  public static AggregatorFactory<Float> MAX_FLOATS = new AggregatorFactory<Float>() {
-    public Aggregator<Float> create() {
-      return new MaxFloats();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_DOUBLES()}
-   */
-  public static class MaxDoubles extends SimpleAggregator<Double> {
-    private Double max = null;
-
-    @Override
-    public void reset() {
-      max = null;
-    }
-
-    @Override
-    public void update(Double next) {
-      if (max == null || max < next) {
-        max = next;
-      }
-    }
-
-    @Override
-    public Iterable<Double> results() {
-      return ImmutableList.of(max);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_DOUBLES()}
-   */
-  public static AggregatorFactory<Double> MAX_DOUBLES = new AggregatorFactory<Double>() {
-    public Aggregator<Double> create() {
-      return new MaxDoubles();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_BIGINTS()}
-   */
-  public static class MaxBigInts extends SimpleAggregator<BigInteger> {
-    private BigInteger max = null;
-
-    @Override
-    public void reset() {
-      max = null;
-    }
-
-    @Override
-    public void update(BigInteger next) {
-      if (max == null || max.compareTo(next) < 0) {
-        max = next;
-      }
-    }
-
-    @Override
-    public Iterable<BigInteger> results() {
-      return ImmutableList.of(max);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_BIGINTS()}
-   */
-  public static AggregatorFactory<BigInteger> MAX_BIGINTS = new AggregatorFactory<BigInteger>() {
-    public Aggregator<BigInteger> create() {
-      return new MaxBigInts();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_LONGS()}
-   */
-  public static class MinLongs extends SimpleAggregator<Long> {
-    private Long min = null;
-
-    @Override
-    public void reset() {
-      min = null;
-    }
-
-    @Override
-    public void update(Long next) {
-      if (min == null || min > next) {
-        min = next;
-      }
-    }
-
-    @Override
-    public Iterable<Long> results() {
-      return ImmutableList.of(min);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_LONGS()}
-   */
-  public static AggregatorFactory<Long> MIN_LONGS = new AggregatorFactory<Long>() {
-    public Aggregator<Long> create() {
-      return new MinLongs();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_INTS()}
-   */
-  public static class MinInts extends SimpleAggregator<Integer> {
-    private Integer min = null;
-
-    @Override
-    public void reset() {
-      min = null;
-    }
-
-    @Override
-    public void update(Integer next) {
-      if (min == null || min > next) {
-        min = next;
-      }
-    }
-
-    @Override
-    public Iterable<Integer> results() {
-      return ImmutableList.of(min);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_INTS()}
-   */
-  public static AggregatorFactory<Integer> MIN_INTS = new AggregatorFactory<Integer>() {
-    public Aggregator<Integer> create() {
-      return new MinInts();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_FLOATS()}
-   */
-  public static class MinFloats extends SimpleAggregator<Float> {
-    private Float min = null;
-
-    @Override
-    public void reset() {
-      min = null;
-    }
-
-    @Override
-    public void update(Float next) {
-      if (min == null || min > next) {
-        min = next;
-      }
-    }
-
-    @Override
-    public Iterable<Float> results() {
-      return ImmutableList.of(min);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_FLOATS()}
-   */
-  public static AggregatorFactory<Float> MIN_FLOATS = new AggregatorFactory<Float>() {
-    public Aggregator<Float> create() {
-      return new MinFloats();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_DOUBLES()}
-   */
-  public static class MinDoubles extends SimpleAggregator<Double> {
-    private Double min = null;
-
-    @Override
-    public void reset() {
-      min = null;
-    }
-
-    @Override
-    public void update(Double next) {
-      if (min == null || min > next) {
-        min = next;
-      }
-    }
-
-    @Override
-    public Iterable<Double> results() {
-      return ImmutableList.of(min);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_DOUBLES()}
-   */
-  public static AggregatorFactory<Double> MIN_DOUBLES = new AggregatorFactory<Double>() {
-    public Aggregator<Double> create() {
-      return new MinDoubles();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_BIGINTS()}
-   */
-  public static class MinBigInts extends SimpleAggregator<BigInteger> {
-    private BigInteger min = null;
-
-    @Override
-    public void reset() {
-      min = null;
-    }
-
-    @Override
-    public void update(BigInteger next) {
-      if (min == null || min.compareTo(next) > 0) {
-        min = next;
-      }
-    }
-
-    @Override
-    public Iterable<BigInteger> results() {
-      return ImmutableList.of(min);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_BIGINTS()}
-   */
-  public static AggregatorFactory<BigInteger> MIN_BIGINTS = new AggregatorFactory<BigInteger>() {
-    public Aggregator<BigInteger> create() {
-      return new MinBigInts();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_N(int, Class)}
-   */
-  public static class MaxNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
-    private final int arity;
-    private transient SortedSet<V> elements;
-
-    public MaxNAggregator(int arity) {
-      this.arity = arity;
-    }
-
-    @Override
-    public void reset() {
-      if (elements == null) {
-        elements = Sets.newTreeSet();
-      } else {
-        elements.clear();
-      }
-    }
-
-    @Override
-    public void update(V value) {
-      if (elements.size() < arity) {
-        elements.add(value);
-      } else if (value.compareTo(elements.first()) > 0) {
-        elements.remove(elements.first());
-        elements.add(value);
-      }
-    }
-
-    @Override
-    public Iterable<V> results() {
-      return ImmutableList.copyOf(elements);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_N(int, Class)}
-   */
-  public static class MinNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
-    private final int arity;
-    private transient SortedSet<V> elements;
-
-    public MinNAggregator(int arity) {
-      this.arity = arity;
-    }
-
-    @Override
-    public void reset() {
-      if (elements == null) {
-        elements = Sets.newTreeSet();
-      } else {
-        elements.clear();
-      }
-    }
-
-    @Override
-    public void update(V value) {
-      if (elements.size() < arity) {
-        elements.add(value);
-      } else if (value.compareTo(elements.last()) < 0) {
-        elements.remove(elements.last());
-        elements.add(value);
-      }
-    }
-
-    @Override
-    public Iterable<V> results() {
-      return ImmutableList.copyOf(elements);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#FIRST_N(int)}
-   */
-  public static class FirstNAggregator<V> extends SimpleAggregator<V> {
-    private final int arity;
-    private final List<V> elements;
-
-    public FirstNAggregator(int arity) {
-      this.arity = arity;
-      this.elements = Lists.newArrayList();
-    }
-
-    @Override
-    public void reset() {
-      elements.clear();
-    }
-
-    @Override
-    public void update(V value) {
-      if (elements.size() < arity) {
-        elements.add(value);
-      }
-    }
-
-    @Override
-    public Iterable<V> results() {
-      return ImmutableList.copyOf(elements);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#LAST_N(int)}
-   */
-  public static class LastNAggregator<V> extends SimpleAggregator<V> {
-    private final int arity;
-    private final LinkedList<V> elements;
-
-    public LastNAggregator(int arity) {
-      this.arity = arity;
-      this.elements = Lists.newLinkedList();
-    }
-
-    @Override
-    public void reset() {
-      elements.clear();
-    }
-
-    @Override
-    public void update(V value) {
-      elements.add(value);
-      if (elements.size() == arity + 1) {
-        elements.removeFirst();
-      }
-    }
-
-    @Override
-    public Iterable<V> results() {
-      return ImmutableList.copyOf(elements);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#STRING_CONCAT(String, boolean, long, long)}
-   */
-  public static class StringConcatAggregator extends SimpleAggregator<String> {
-    private final String separator;
-    private final boolean skipNulls;
-    private final long maxOutputLength;
-    private final long maxInputLength;
-    private long currentLength;
-    private final LinkedList<String> list = new LinkedList<String>();
-
-    private transient Joiner joiner;
-    
-    public StringConcatAggregator(final String separator, final boolean skipNulls) {
-      this.separator = separator;
-      this.skipNulls = skipNulls;
-      this.maxInputLength = 0;
-      this.maxOutputLength = 0;
-    }
-
-    public StringConcatAggregator(final String separator, final boolean skipNull, final long maxOutputLength, final long maxInputLength) {
-      this.separator = separator;
-      this.skipNulls = skipNull;
-      this.maxOutputLength = maxOutputLength;
-      this.maxInputLength = maxInputLength;
-      this.currentLength = -separator.length();
-    }
-
-    @Override
-    public void reset() {
-      if (joiner == null) {
-        joiner = skipNulls ? Joiner.on(separator).skipNulls() : Joiner.on(separator);
-      }
-      currentLength = -separator.length();
-      list.clear();
-    }
-
-    @Override
-    public void update(final String next) {
-      long length = (next == null) ? 0 : next.length() + separator.length();
-      if (maxOutputLength > 0 && currentLength + length > maxOutputLength || maxInputLength > 0 && next.length() > maxInputLength) {
-        return;
-      }
-      if (maxOutputLength > 0) {
-        currentLength += length;
-      }
-      list.add(next);
-    }
-
-    @Override
-    public Iterable<String> results() {
-      return ImmutableList.of(joiner.join(list));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/CrunchRuntimeException.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/CrunchRuntimeException.java b/crunch/src/main/java/org/apache/crunch/CrunchRuntimeException.java
deleted file mode 100644
index 044f600..0000000
--- a/crunch/src/main/java/org/apache/crunch/CrunchRuntimeException.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-/**
- * A {@code RuntimeException} implementation that includes some additional options
- * for the Crunch execution engine to track reporting status. Clients may
- * use instances of this class in their own {@code DoFn} implementations.
- */
-public class CrunchRuntimeException extends RuntimeException {
-
-  private boolean logged = false;
-
-  public CrunchRuntimeException(String msg) {
-    super(msg);
-  }
-
-  public CrunchRuntimeException(Exception e) {
-    super(e);
-  }
-
-  public CrunchRuntimeException(String msg, Exception e) {
-    super(msg, e);
-  }
-
-  /**
-   * Returns true if this exception was written to the debug logs.
-   */
-  public boolean wasLogged() {
-    return logged;
-  }
-
-  /**
-   * Indicate that this exception has been written to the debug logs.
-   */
-  public void markLogged() {
-    this.logged = true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/DoFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/DoFn.java b/crunch/src/main/java/org/apache/crunch/DoFn.java
deleted file mode 100644
index 2c6389a..0000000
--- a/crunch/src/main/java/org/apache/crunch/DoFn.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.io.Serializable;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-/**
- * Base class for all data processing functions in Crunch.
- * 
- * <p>
- * Note that all {@code DoFn} instances implement {@link Serializable}, and thus
- * all of their non-transient member variables must implement
- * {@code Serializable} as well. If your DoFn depends on non-serializable
- * classes for data processing, they may be declared as {@code transient} and
- * initialized in the DoFn's {@code initialize} method.
- * 
- */
-public abstract class DoFn<S, T> implements Serializable {
-  private transient TaskInputOutputContext<?, ?, ?, ?> context;
-
-  /**
-   * Configure this DoFn. Subclasses may override this method to modify the
-   * configuration of the Job that this DoFn instance belongs to.
-   * 
-   * <p>
-   * Called during the job planning phase by the crunch-client.
-   * </p>
-   * 
-   * @param conf
-   *          The Configuration instance for the Job.
-   */
-  public void configure(Configuration conf) {
-  }
-
-  /**
-   * Initialize this DoFn. This initialization will happen before the actual
-   * {@link #process(Object, Emitter)} is triggered. Subclasses may override
-   * this method to do appropriate initialization.
-   * 
-   * <p>
-   * Called during the setup of the job instance this {@code DoFn} is associated
-   * with.
-   * </p>
-   * 
-   */
-  public void initialize() {
-  }
-
-  /**
-   * Processes the records from a {@link PCollection}.
-   * 
-   * <br/>
-   * <br/>
-   * <b>Note:</b> Crunch can reuse a single input record object whose content
-   * changes on each {@link #process(Object, Emitter)} method call. This
-   * functionality is imposed by Hadoop's <a href=
-   * "http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/Reducer.html"
-   * >Reducer</a> implementation: <i>The framework will reuse the key and value
-   * objects that are passed into the reduce, therefore the application should
-   * clone the objects they want to keep a copy of.</i>
-   * 
-   * @param input
-   *          The input record.
-   * @param emitter
-   *          The emitter to send the output to
-   */
-  public abstract void process(S input, Emitter<T> emitter);
-
-  /**
-   * Called during the cleanup of the MapReduce job this {@code DoFn} is
-   * associated with. Subclasses may override this method to do appropriate
-   * cleanup.
-   * 
-   * @param emitter
-   *          The emitter that was used for output
-   */
-  public void cleanup(Emitter<T> emitter) {
-  }
-
-  /**
-   * Called during setup to pass the {@link TaskInputOutputContext} to this
-   * {@code DoFn} instance.
-   */
-  public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
-    this.context = context;
-  }
-
-  /**
-   * Returns an estimate of how applying this function to a {@link PCollection}
-   * will cause it to change in side. The optimizer uses these estimates to
-   * decide where to break up dependent MR jobs into separate Map and Reduce
-   * phases in order to minimize I/O.
-   * 
-   * <p>
-   * Subclasses of {@code DoFn} that will substantially alter the size of the
-   * resulting {@code PCollection} should override this method.
-   */
-  public float scaleFactor() {
-    return 1.2f;
-  }
-
-  protected TaskInputOutputContext<?, ?, ?, ?> getContext() {
-    return context;
-  }
-
-  protected Configuration getConfiguration() {
-    return context.getConfiguration();
-  }
-
-  protected Counter getCounter(Enum<?> counterName) {
-    return context.getCounter(counterName);
-  }
-
-  protected Counter getCounter(String groupName, String counterName) {
-    return context.getCounter(groupName, counterName);
-  }
-
-  protected void increment(Enum<?> counterName) {
-    increment(counterName, 1);
-  }
-
-  protected void increment(Enum<?> counterName, long value) {
-    getCounter(counterName).increment(value);
-  }
-
-  protected void progress() {
-    context.progress();
-  }
-
-  protected TaskAttemptID getTaskAttemptID() {
-    return context.getTaskAttemptID();
-  }
-
-  protected void setStatus(String status) {
-    context.setStatus(status);
-  }
-
-  protected String getStatus() {
-    return context.getStatus();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/Emitter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Emitter.java b/crunch/src/main/java/org/apache/crunch/Emitter.java
deleted file mode 100644
index d104a09..0000000
--- a/crunch/src/main/java/org/apache/crunch/Emitter.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-/**
- * Interface for writing outputs from a {@link DoFn}.
- * 
- */
-public interface Emitter<T> {
-  /**
-   * Write the emitted value to the next stage of the pipeline.
-   * 
-   * @param emitted
-   *          The value to write
-   */
-  void emit(T emitted);
-
-  /**
-   * Flushes any values cached by this emitter. Called during the cleanup stage.
-   */
-  void flush();
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/FilterFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/FilterFn.java b/crunch/src/main/java/org/apache/crunch/FilterFn.java
deleted file mode 100644
index 440f122..0000000
--- a/crunch/src/main/java/org/apache/crunch/FilterFn.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.util.List;
-
-import org.apache.crunch.fn.FilterFns;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- * A {@link DoFn} for the common case of filtering the members of a
- * {@link PCollection} based on a boolean condition.
- */
-public abstract class FilterFn<T> extends DoFn<T, T> {
-
-  /**
-   * If true, emit the given record.
-   */
-  public abstract boolean accept(T input);
-
-  @Override
-  public void process(T input, Emitter<T> emitter) {
-    if (accept(input)) {
-      emitter.emit(input);
-    }
-  }
-  
-  @Override
-  public final void cleanup(Emitter<T> emitter) {
-    cleanup();
-  }
-  
-  /**
-   * Called during the cleanup of the MapReduce job this {@code FilterFn} is
-   * associated with. Subclasses may override this method to do appropriate
-   * cleanup.
-   */
-  public void cleanup() {
-  }
-  
-  @Override
-  public float scaleFactor() {
-    return 0.5f;
-  }
-
-  /**
-   * @deprecated Use {@link FilterFns#and(FilterFn...)}
-   */
-  public static <S> FilterFn<S> and(FilterFn<S>... fns) {
-    return new AndFn<S>(fns);
-  }
-
-  /**
-   * @deprecated Use {@link FilterFns#and(FilterFn...)}
-   */
-  public static class AndFn<S> extends FilterFn<S> {
-
-    private final List<FilterFn<S>> fns;
-
-    public AndFn(FilterFn<S>... fns) {
-      this.fns = ImmutableList.<FilterFn<S>> copyOf(fns);
-    }
-    
-    @Override
-    public void configure(Configuration conf) {
-      for (FilterFn<S> fn : fns) {
-        fn.configure(conf);
-      }
-    }
-
-    @Override
-    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
-      for (FilterFn<S> fn : fns) {
-        fn.setContext(context);
-      }
-    }
-    
-    @Override
-    public void initialize() {
-      for (FilterFn<S> fn : fns) {
-        fn.initialize();
-      }
-    }
-
-    @Override
-    public void cleanup() {
-      for (FilterFn<S> fn : fns) {
-        fn.cleanup();
-      }
-    }
-
-    @Override
-    public boolean accept(S input) {
-      for (FilterFn<S> fn : fns) {
-        if (!fn.accept(input)) {
-          return false;
-        }
-      }
-      return true;
-    }
-    
-    @Override
-    public float scaleFactor() {
-      float scaleFactor = 1.0f;
-      for (FilterFn<S> fn : fns) {
-        scaleFactor *= fn.scaleFactor();
-      }
-      return scaleFactor;
-    }
-  }
-
-  /**
-   * @deprecated Use {@link FilterFns#or(FilterFn...)}
-   */
-  public static <S> FilterFn<S> or(FilterFn<S>... fns) {
-    return new OrFn<S>(fns);
-  }
-
-  /**
-   * @deprecated Use {@link FilterFns#or(FilterFn...)}
-   */
-  public static class OrFn<S> extends FilterFn<S> {
-
-    private final List<FilterFn<S>> fns;
-
-    public OrFn(FilterFn<S>... fns) {
-      this.fns = ImmutableList.<FilterFn<S>> copyOf(fns);
-    }
-    
-    @Override
-    public void configure(Configuration conf) {
-      for (FilterFn<S> fn : fns) {
-        fn.configure(conf);
-      }
-    }
-
-    @Override
-    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
-      for (FilterFn<S> fn : fns) {
-        fn.setContext(context);
-      }
-    }
-    
-    @Override
-    public void initialize() {
-      for (FilterFn<S> fn : fns) {
-        fn.initialize();
-      }
-    }
-    
-    @Override
-    public void cleanup() {
-      for (FilterFn<S> fn : fns) {
-        fn.cleanup();
-      }
-    }
-
-    @Override
-    public boolean accept(S input) {
-      for (FilterFn<S> fn : fns) {
-        if (fn.accept(input)) {
-          return true;
-        }
-      }
-      return false;
-    }
-    
-    @Override
-    public float scaleFactor() {
-      float scaleFactor = 0.0f;
-      for (FilterFn<S> fn : fns) {
-        scaleFactor += fn.scaleFactor();
-      }
-      return Math.min(1.0f, scaleFactor);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link FilterFns#not(FilterFn)}
-   */
-  public static <S> FilterFn<S> not(FilterFn<S> fn) {
-    return new NotFn<S>(fn);
-  }
-
-  /**
-   * @deprecated Use {@link FilterFns#not(FilterFn)}
-   */
-  public static class NotFn<S> extends FilterFn<S> {
-
-    private final FilterFn<S> base;
-
-    public NotFn(FilterFn<S> base) {
-      this.base = base;
-    }
-    
-    @Override
-    public void configure(Configuration conf) {
-        base.configure(conf);
-    }
-
-    @Override
-    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
-      base.setContext(context);
-    }
-    
-    @Override
-    public void initialize() {
-      base.initialize();
-    }
-    
-    @Override
-    public void cleanup() {
-      base.cleanup();
-    }
-    
-    @Override
-    public boolean accept(S input) {
-      return !base.accept(input);
-    }
-
-    @Override
-    public float scaleFactor() {
-      return 1.0f - base.scaleFactor();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/GroupingOptions.java b/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
deleted file mode 100644
index 4aa1343..0000000
--- a/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Partitioner;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * Options that can be passed to a {@code groupByKey} operation in order to
- * exercise finer control over how the partitioning, grouping, and sorting of
- * keys is performed.
- * 
- */
-public class GroupingOptions {
-
-  private final Class<? extends Partitioner> partitionerClass;
-  private final Class<? extends RawComparator> groupingComparatorClass;
-  private final Class<? extends RawComparator> sortComparatorClass;
-  private final int numReducers;
-  private final Map<String, String> extraConf;
-  private final Set<SourceTarget<?>> sourceTargets;
-  
-  private GroupingOptions(Class<? extends Partitioner> partitionerClass,
-      Class<? extends RawComparator> groupingComparatorClass, Class<? extends RawComparator> sortComparatorClass,
-      int numReducers, Map<String, String> extraConf, Set<SourceTarget<?>> sourceTargets) {
-    this.partitionerClass = partitionerClass;
-    this.groupingComparatorClass = groupingComparatorClass;
-    this.sortComparatorClass = sortComparatorClass;
-    this.numReducers = numReducers;
-    this.extraConf = extraConf;
-    this.sourceTargets = sourceTargets;
-  }
-
-  public int getNumReducers() {
-    return numReducers;
-  }
-
-  public Class<? extends RawComparator> getSortComparatorClass() {
-    return sortComparatorClass;
-  }
-
-  public Class<? extends RawComparator> getGroupingComparatorClass() {
-    return groupingComparatorClass;
-  }
-  
-  public Class<? extends Partitioner> getPartitionerClass() {
-    return partitionerClass;
-  }
-  
-  public Set<SourceTarget<?>> getSourceTargets() {
-    return sourceTargets;
-  }
-  
-  public void configure(Job job) {
-    if (partitionerClass != null) {
-      job.setPartitionerClass(partitionerClass);
-    }
-    if (groupingComparatorClass != null) {
-      job.setGroupingComparatorClass(groupingComparatorClass);
-    }
-    if (sortComparatorClass != null) {
-      job.setSortComparatorClass(sortComparatorClass);
-    }
-    if (numReducers > 0) {
-      job.setNumReduceTasks(numReducers);
-    }
-    for (Map.Entry<String, String> e : extraConf.entrySet()) {
-      job.getConfiguration().set(e.getKey(), e.getValue());
-    }
-  }
-
-  public boolean isCompatibleWith(GroupingOptions other) {
-    if (partitionerClass != other.partitionerClass) {
-      return false;
-    }
-    if (groupingComparatorClass != other.groupingComparatorClass) {
-      return false;
-    }
-    if (sortComparatorClass != other.sortComparatorClass) {
-      return false;
-    }
-    if (!extraConf.equals(other.extraConf)) {
-      return false;
-    }
-    return true;
-  }
-
-  public static Builder builder() {
-    return new Builder();
-  }
-
-  /**
-   * Builder class for creating {@code GroupingOptions} instances.
-   * 
-   */
-  public static class Builder {
-    private Class<? extends Partitioner> partitionerClass;
-    private Class<? extends RawComparator> groupingComparatorClass;
-    private Class<? extends RawComparator> sortComparatorClass;
-    private int numReducers;
-    private Map<String, String> extraConf = Maps.newHashMap();
-    private Set<SourceTarget<?>> sourceTargets = Sets.newHashSet();
-    
-    public Builder() {
-    }
-
-    public Builder partitionerClass(Class<? extends Partitioner> partitionerClass) {
-      this.partitionerClass = partitionerClass;
-      return this;
-    }
-
-    public Builder groupingComparatorClass(Class<? extends RawComparator> groupingComparatorClass) {
-      this.groupingComparatorClass = groupingComparatorClass;
-      return this;
-    }
-
-    public Builder sortComparatorClass(Class<? extends RawComparator> sortComparatorClass) {
-      this.sortComparatorClass = sortComparatorClass;
-      return this;
-    }
-
-    public Builder numReducers(int numReducers) {
-      if (numReducers <= 0) {
-        throw new IllegalArgumentException("Invalid number of reducers: " + numReducers);
-      }
-      this.numReducers = numReducers;
-      return this;
-    }
-
-    public Builder conf(String confKey, String confValue) {
-      this.extraConf.put(confKey, confValue);
-      return this;
-    }
-    
-    public Builder sourceTarget(SourceTarget<?> st) {
-      this.sourceTargets.add(st);
-      return this;
-    }
-    
-    public GroupingOptions build() {
-      return new GroupingOptions(partitionerClass, groupingComparatorClass, sortComparatorClass,
-          numReducers, extraConf, sourceTargets);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/MapFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/MapFn.java b/crunch/src/main/java/org/apache/crunch/MapFn.java
deleted file mode 100644
index dbf172e..0000000
--- a/crunch/src/main/java/org/apache/crunch/MapFn.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-/**
- * A {@link DoFn} for the common case of emitting exactly one value for each
- * input record.
- * 
- */
-public abstract class MapFn<S, T> extends DoFn<S, T> {
-
-  /**
-   * Maps the given input into an instance of the output type.
-   */
-  public abstract T map(S input);
-
-  @Override
-  public void process(S input, Emitter<T> emitter) {
-    emitter.emit(map(input));
-  }
-
-  @Override
-  public float scaleFactor() {
-    return 1.0f;
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/PCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PCollection.java b/crunch/src/main/java/org/apache/crunch/PCollection.java
deleted file mode 100644
index 6f5abf6..0000000
--- a/crunch/src/main/java/org/apache/crunch/PCollection.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.util.Collection;
-
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-
-/**
- * A representation of an immutable, distributed collection of elements that is
- * the fundamental target of computations in Crunch.
- *
- */
-public interface PCollection<S> {
-  /**
-   * Returns the {@code Pipeline} associated with this PCollection.
-   */
-  Pipeline getPipeline();
-
-  /**
-   * Returns a {@code PCollection} instance that acts as the union of this
-   * {@code PCollection} and the given {@code PCollection}.
-   */
-  PCollection<S> union(PCollection<S> other);
-  
-  /**
-   * Returns a {@code PCollection} instance that acts as the union of this
-   * {@code PCollection} and the input {@code PCollection}s.
-   */
-  PCollection<S> union(PCollection<S>... collections);
-
-  /**
-   * Applies the given doFn to the elements of this {@code PCollection} and
-   * returns a new {@code PCollection} that is the output of this processing.
-   *
-   * @param doFn
-   *          The {@code DoFn} to apply
-   * @param type
-   *          The {@link PType} of the resulting {@code PCollection}
-   * @return a new {@code PCollection}
-   */
-  <T> PCollection<T> parallelDo(DoFn<S, T> doFn, PType<T> type);
-
-  /**
-   * Applies the given doFn to the elements of this {@code PCollection} and
-   * returns a new {@code PCollection} that is the output of this processing.
-   *
-   * @param name
-   *          An identifier for this processing step, useful for debugging
-   * @param doFn
-   *          The {@code DoFn} to apply
-   * @param type
-   *          The {@link PType} of the resulting {@code PCollection}
-   * @return a new {@code PCollection}
-   */
-  <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type);
-  
-  /**
-   * Applies the given doFn to the elements of this {@code PCollection} and
-   * returns a new {@code PCollection} that is the output of this processing.
-   *
-   * @param name
-   *          An identifier for this processing step, useful for debugging
-   * @param doFn
-   *          The {@code DoFn} to apply
-   * @param type
-   *          The {@link PType} of the resulting {@code PCollection}
-   * @param options
-   *          Optional information that is needed for certain pipeline operations
-   * @return a new {@code PCollection}
-   */
-  <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type,
-      ParallelDoOptions options);
-
-  /**
-   * Similar to the other {@code parallelDo} instance, but returns a
-   * {@code PTable} instance instead of a {@code PCollection}.
-   *
-   * @param doFn
-   *          The {@code DoFn} to apply
-   * @param type
-   *          The {@link PTableType} of the resulting {@code PTable}
-   * @return a new {@code PTable}
-   */
-  <K, V> PTable<K, V> parallelDo(DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type);
-
-  /**
-   * Similar to the other {@code parallelDo} instance, but returns a
-   * {@code PTable} instance instead of a {@code PCollection}.
-   *
-   * @param name
-   *          An identifier for this processing step
-   * @param doFn
-   *          The {@code DoFn} to apply
-   * @param type
-   *          The {@link PTableType} of the resulting {@code PTable}
-   * @return a new {@code PTable}
-   */
-  <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type);
-  
-  /**
-   * Similar to the other {@code parallelDo} instance, but returns a
-   * {@code PTable} instance instead of a {@code PCollection}.
-   *
-   * @param name
-   *          An identifier for this processing step
-   * @param doFn
-   *          The {@code DoFn} to apply
-   * @param type
-   *          The {@link PTableType} of the resulting {@code PTable}
-   * @param options
-   *          Optional information that is needed for certain pipeline operations
-   * @return a new {@code PTable}
-   */
-  <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type,
-      ParallelDoOptions options);
-
-  /**
-   * Write the contents of this {@code PCollection} to the given {@code Target},
-   * using the storage format specified by the target.
-   *
-   * @param target
-   *          The target to write to
-   */
-  PCollection<S> write(Target target);
-
-  /**
-   * Write the contents of this {@code PCollection} to the given {@code Target},
-   * using the given {@code Target.WriteMode} to handle existing
-   * targets.
-   * 
-   * @param target
-   *          The target
-   * @param writeMode
-   *          The rule for handling existing outputs at the target location
-   */
-  PCollection<S> write(Target target, Target.WriteMode writeMode);
-  
-  /**
-   * Returns a reference to the data set represented by this PCollection that
-   * may be used by the client to read the data locally.
-   */
-  Iterable<S> materialize();
-
-  /**
-   * @return A {@code PObject} encapsulating an in-memory {@link Collection} containing the values
-   * of this {@code PCollection}.
-   */
-  PObject<Collection<S>> asCollection();
-
-  /**
-   * Returns the {@code PType} of this {@code PCollection}.
-   */
-  PType<S> getPType();
-
-  /**
-   * Returns the {@code PTypeFamily} of this {@code PCollection}.
-   */
-  PTypeFamily getTypeFamily();
-
-  /**
-   * Returns the size of the data represented by this {@code PCollection} in
-   * bytes.
-   */
-  long getSize();
-
-  /**
-   * Returns the number of elements represented by this {@code PCollection}.
-   *
-   * @return An {@code PObject} containing the number of elements in this {@code PCollection}.
-   */
-  PObject<Long> length();
-
-  /**
-   * Returns a shorthand name for this PCollection.
-   */
-  String getName();
-
-  /**
-   * Apply the given filter function to this instance and return the resulting
-   * {@code PCollection}.
-   */
-  PCollection<S> filter(FilterFn<S> filterFn);
-
-  /**
-   * Apply the given filter function to this instance and return the resulting
-   * {@code PCollection}.
-   *
-   * @param name
-   *          An identifier for this processing step
-   * @param filterFn
-   *          The {@code FilterFn} to apply
-   */
-  PCollection<S> filter(String name, FilterFn<S> filterFn);
-
-  /**
-   * Apply the given map function to each element of this instance in order to
-   * create a {@code PTable}.
-   */
-  <K> PTable<K, S> by(MapFn<S, K> extractKeyFn, PType<K> keyType);
-
-  /**
-   * Apply the given map function to each element of this instance in order to
-   * create a {@code PTable}.
-   *
-   * @param name
-   *          An identifier for this processing step
-   * @param extractKeyFn
-   *          The {@code MapFn} to apply
-   */
-  <K> PTable<K, S> by(String name, MapFn<S, K> extractKeyFn, PType<K> keyType);
-
-  /**
-   * Returns a {@code PTable} instance that contains the counts of each unique
-   * element of this PCollection.
-   */
-  PTable<S, Long> count();
-
-  /**
-   * Returns a {@code PObject} of the maximum element of this instance.
-   */
-  PObject<S> max();
-
-  /**
-   * Returns a {@code PObject} of the minimum element of this instance.
-   */
-  PObject<S> min();
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/PGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PGroupedTable.java b/crunch/src/main/java/org/apache/crunch/PGroupedTable.java
deleted file mode 100644
index d77ffdb..0000000
--- a/crunch/src/main/java/org/apache/crunch/PGroupedTable.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.crunch.Aggregator;
-
-/**
- * The Crunch representation of a grouped {@link PTable}.
- * 
- */
-public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> {
-
-  /**
-   * Combines the values of this grouping using the given {@code CombineFn}.
-   * 
-   * @param combineFn
-   *          The combiner function
-   * @return A {@code PTable} where each key has a single value
-   */
-  PTable<K, V> combineValues(CombineFn<K, V> combineFn);
-
-  /**
-   * Combine the values in each group using the given {@link Aggregator}.
-   *
-   * @param aggregator The function to use
-   * @return A {@link PTable} where each group key maps to an aggregated
-   *         value. Group keys may be repeated if an aggregator returns
-   *         more than one value.
-   */
-  PTable<K, V> combineValues(Aggregator<V> aggregator);
-
-  /**
-   * Convert this grouping back into a multimap.
-   * 
-   * @return an ungrouped version of the data in this {@code PGroupedTable}.
-   */
-  PTable<K, V> ungroup();
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/PObject.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PObject.java b/crunch/src/main/java/org/apache/crunch/PObject.java
deleted file mode 100644
index 897a01f..0000000
--- a/crunch/src/main/java/org/apache/crunch/PObject.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-/**
- * A {@code PObject} represents a singleton object value that results from a distributed
- * computation. Computation producing the value is deferred until
- * {@link org.apache.crunch.PObject#getValue()} is called.
- *
- * @param <T> The type of value encapsulated by this {@code PObject}.
- */
-public interface PObject<T> {
-  /**
-   * Gets the value associated with this {@code PObject}.  Calling this method will trigger
-   * whatever computation is necessary to obtain the value and block until that computation
-   * succeeds.
-   *
-   * @return The value associated with this {@code PObject}.
-   */
-  T getValue();
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/PTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PTable.java b/crunch/src/main/java/org/apache/crunch/PTable.java
deleted file mode 100644
index 8df9853..0000000
--- a/crunch/src/main/java/org/apache/crunch/PTable.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-
-/**
- * A sub-interface of {@code PCollection} that represents an immutable,
- * distributed multi-map of keys and values.
- *
- */
-public interface PTable<K, V> extends PCollection<Pair<K, V>> {
-
-  /**
-   Returns a {@code PTable} instance that acts as the union of this
-   * {@code PTable} and the other {@code PTable}s.
-   */
-  PTable<K, V> union(PTable<K, V> other);
-  
-  /**
-   * Returns a {@code PTable} instance that acts as the union of this
-   * {@code PTable} and the input {@code PTable}s.
-   */
-  PTable<K, V> union(PTable<K, V>... others);
-
-  /**
-   * Performs a grouping operation on the keys of this table.
-   *
-   * @return a {@code PGroupedTable} instance that represents the grouping
-   */
-  PGroupedTable<K, V> groupByKey();
-
-  /**
-   * Performs a grouping operation on the keys of this table, using the given
-   * number of partitions.
-   *
-   * @param numPartitions
-   *          The number of partitions for the data.
-   * @return a {@code PGroupedTable} instance that represents this grouping
-   */
-  PGroupedTable<K, V> groupByKey(int numPartitions);
-
-  /**
-   * Performs a grouping operation on the keys of this table, using the
-   * additional {@code GroupingOptions} to control how the grouping is executed.
-   *
-   * @param options
-   *          The grouping options to use
-   * @return a {@code PGroupedTable} instance that represents the grouping
-   */
-  PGroupedTable<K, V> groupByKey(GroupingOptions options);
-
-  /**
-   * Writes this {@code PTable} to the given {@code Target}.
-   */
-  PTable<K, V> write(Target target);
-
-  /**
-   * Writes this {@code PTable} to the given {@code Target}, using the
-   * given {@code Target.WriteMode} to handle existing targets.
-   */
-  PTable<K, V> write(Target target, Target.WriteMode writeMode);
-
-  /**
-   * Returns the {@code PTableType} of this {@code PTable}.
-   */
-  PTableType<K, V> getPTableType();
-
-  /**
-   * Returns the {@code PType} of the key.
-   */
-  PType<K> getKeyType();
-
-  /**
-   * Returns the {@code PType} of the value.
-   */
-  PType<V> getValueType();
-
-  /**
-   * Aggregate all of the values with the same key into a single key-value pair
-   * in the returned PTable.
-   */
-  PTable<K, Collection<V>> collectValues();
-
-  /**
-   * Apply the given filter function to this instance and return the resulting
-   * {@code PTable}.
-   */
-  PTable<K, V> filter(FilterFn<Pair<K, V>> filterFn);
-  
-  /**
-   * Apply the given filter function to this instance and return the resulting
-   * {@code PTable}.
-   *
-   * @param name
-   *          An identifier for this processing step
-   * @param filterFn
-   *          The {@code FilterFn} to apply
-   */
-  PTable<K, V> filter(String name, FilterFn<Pair<K, V>> filterFn);
-  
-  /**
-   * Returns a PTable made up of the pairs in this PTable with the largest value
-   * field.
-   *
-   * @param count
-   *          The number of pairs to return
-   */
-  PTable<K, V> top(int count);
-
-  /**
-   * Returns a PTable made up of the pairs in this PTable with the smallest
-   * value field.
-   *
-   * @param count
-   *          The number of pairs to return
-   */
-  PTable<K, V> bottom(int count);
-
-  /**
-   * Perform an inner join on this table and the one passed in as an argument on
-   * their common keys.
-   */
-  <U> PTable<K, Pair<V, U>> join(PTable<K, U> other);
-
-  /**
-   * Co-group operation with the given table on common keys.
-   */
-  <U> PTable<K, Pair<Collection<V>, Collection<U>>> cogroup(PTable<K, U> other);
-
-  /**
-   * Returns a {@link PCollection} made up of the keys in this PTable.
-   */
-  PCollection<K> keys();
-
-  /**
-   * Returns a {@link PCollection} made up of the values in this PTable.
-   */
-  PCollection<V> values();
-
-  /**
-   * Returns a Map<K, V> made up of the keys and values in this PTable.
-   * <p>
-   * <b>Note:</b> The contents of the returned map may not be exactly the same
-   * as this PTable, as a PTable is a multi-map (i.e. can contain multiple
-   * values for a single key).
-   */
-  Map<K, V> materializeToMap();
-
-  /**
-   * Returns a {@link PObject} encapsulating a {@link Map} made up of the keys and values in this
-   * {@code PTable}.
-   * <p><b>Note:</b>The contents of the returned map may not be exactly the same as this PTable,
-   * as a PTable is a multi-map (i.e. can contain multiple values for a single key).
-   * </p>
-   *
-   * @return The {@code PObject} encapsulating a {@code Map} made up of the keys and values in
-   * this {@code PTable}.
-   */
-  PObject<Map<K, V>> asMap();
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/Pair.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Pair.java b/crunch/src/main/java/org/apache/crunch/Pair.java
deleted file mode 100644
index fd058b6..0000000
--- a/crunch/src/main/java/org/apache/crunch/Pair.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-/**
- * A convenience class for two-element {@link Tuple}s.
- */
-public class Pair<K, V> implements Tuple, Comparable<Pair<K, V>> {
-
-  private final K first;
-  private final V second;
-
-  public static <T, U> Pair<T, U> of(T first, U second) {
-    return new Pair<T, U>(first, second);
-  }
-
-  public Pair(K first, V second) {
-    this.first = first;
-    this.second = second;
-  }
-
-  public K first() {
-    return first;
-  }
-
-  public V second() {
-    return second;
-  }
-
-  public Object get(int index) {
-    switch (index) {
-    case 0:
-      return first;
-    case 1:
-      return second;
-    default:
-      throw new ArrayIndexOutOfBoundsException();
-    }
-  }
-
-  public int size() {
-    return 2;
-  }
-
-  @Override
-  public int hashCode() {
-    HashCodeBuilder hcb = new HashCodeBuilder();
-    return hcb.append(first).append(second).toHashCode();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    Pair<?, ?> other = (Pair<?, ?>) obj;
-    return (first == other.first || (first != null && first.equals(other.first)))
-        && (second == other.second || (second != null && second.equals(other.second)));
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder("[");
-    sb.append(first).append(",").append(second).append("]");
-    return sb.toString();
-  }
-
-  private int cmp(Object lhs, Object rhs) {
-    if (lhs == rhs) {
-      return 0;
-    } else if (lhs != null && Comparable.class.isAssignableFrom(lhs.getClass())) {
-      return ((Comparable) lhs).compareTo(rhs);
-    }
-    return (lhs == null ? 0 : lhs.hashCode()) - (rhs == null ? 0 : rhs.hashCode());
-  }
-
-  @Override
-  public int compareTo(Pair<K, V> o) {
-    int diff = cmp(first, o.first);
-    if (diff == 0) {
-      diff = cmp(second, o.second);
-    }
-    return diff;
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/ParallelDoOptions.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/ParallelDoOptions.java b/crunch/src/main/java/org/apache/crunch/ParallelDoOptions.java
deleted file mode 100644
index 2407b3a..0000000
--- a/crunch/src/main/java/org/apache/crunch/ParallelDoOptions.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.util.Collections;
-import java.util.Set;
-
-import com.google.common.collect.Sets;
-
-/**
- * Container class that includes optional information about a {@code parallelDo} operation
- * applied to a {@code PCollection}. Primarily used within the Crunch framework
- * itself for certain types of advanced processing operations, such as in-memory joins
- * that require reading a file from the filesystem into a {@code DoFn}.
- */
-public class ParallelDoOptions {
-  private final Set<SourceTarget<?>> sourceTargets;
-  
-  private ParallelDoOptions(Set<SourceTarget<?>> sourceTargets) {
-    this.sourceTargets = sourceTargets;
-  }
-  
-  public Set<SourceTarget<?>> getSourceTargets() {
-    return sourceTargets;
-  }
-  
-  public static Builder builder() {
-    return new Builder();
-  }
-  
-  public static class Builder {
-    private Set<SourceTarget<?>> sourceTargets;
-    
-    public Builder() {
-      this.sourceTargets = Sets.newHashSet();
-    }
-    
-    public Builder sourceTargets(SourceTarget<?>... sourceTargets) {
-      Collections.addAll(this.sourceTargets, sourceTargets);
-      return this;
-    }
-    
-    public ParallelDoOptions build() {
-      return new ParallelDoOptions(sourceTargets);
-    }
-  }
-}