You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:09 UTC

[07/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
deleted file mode 100644
index a3d30d2..0000000
--- a/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import java.util.List;
-
-import org.apache.crunch.Emitter;
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.PType;
-
-import com.google.common.collect.Lists;
-
-/**
- * Used to perform the last step of an inner join.
- * 
- * @param <K> Type of the keys.
- * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
- */
-public class InnerJoinFn<K, U, V> extends JoinFn<K, U, V> {
-
-  private transient K lastKey;
-  private transient List<U> leftValues;
-
-  public InnerJoinFn(PType<K> keyType, PType<U> leftValueType) {
-    super(keyType, leftValueType);
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void initialize() {
-    super.initialize();
-    lastKey = null;
-    this.leftValues = Lists.newArrayList();
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) {
-    if (!key.equals(lastKey)) {
-      lastKey = keyType.getDetachedValue(key);
-      leftValues.clear();
-    }
-    if (id == 0) { // from left
-      for (Pair<U, V> pair : pairs) {
-        if (pair.first() != null)
-          leftValues.add(leftValueType.getDetachedValue(pair.first()));
-      }
-    } else { // from right
-      for (Pair<U, V> pair : pairs) {
-        for (U u : leftValues) {
-          emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
-        }
-      }
-    }
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public String getJoinType() {
-    return "innerJoin";
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
deleted file mode 100644
index 99aea5a..0000000
--- a/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.PType;
-
-/**
- * Represents a {@link org.apache.crunch.DoFn} for performing joins.
- * 
- * @param <K> Type of the keys.
- * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
- */
-public abstract class JoinFn<K, U, V> extends
-    DoFn<Pair<Pair<K, Integer>, Iterable<Pair<U, V>>>, Pair<K, Pair<U, V>>> {
-
-  protected PType<K> keyType;
-  protected PType<U> leftValueType;
-
-  /**
-   * Instantiate with the PType of the value of the left side of the join (used for creating deep
-   * copies of values).
-   * 
-   * @param keyType The PType of the value used as the key of the join
-   * @param leftValueType The PType of the value type of the left side of the join
-   */
-  public JoinFn(PType<K> keyType, PType<U> leftValueType) {
-    this.keyType = keyType;
-    this.leftValueType = leftValueType;
-  }
-
-  @Override
-  public void initialize() {
-    this.keyType.initialize(getConfiguration());
-    this.leftValueType.initialize(getConfiguration());
-  }
-
-  /** @return The name of this join type (e.g. innerJoin, leftOuterJoin). */
-  public abstract String getJoinType();
-
-  /**
-   * Performs the actual joining.
-   * 
-   * @param key The key for this grouping of values.
-   * @param id The side that this group of values is from (0 -> left, 1 -> right).
-   * @param pairs The group of values associated with this key and id pair.
-   * @param emitter The emitter to send the output to.
-   */
-  public abstract void join(K key, int id, Iterable<Pair<U, V>> pairs,
-      Emitter<Pair<K, Pair<U, V>>> emitter);
-
-  /**
-   * Split up the input record to make coding a bit more manageable.
-   * 
-   * @param input The input record.
-   * @param emitter The emitter to send the output to.
-   */
-  @Override
-  public void process(Pair<Pair<K, Integer>, Iterable<Pair<U, V>>> input,
-      Emitter<Pair<K, Pair<U, V>>> emitter) {
-    join(input.first().first(), input.first().second(), input.second(), emitter);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/JoinUtils.java b/crunch/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
deleted file mode 100644
index 6efeccb..0000000
--- a/crunch/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.avro.io.BinaryData;
-import org.apache.avro.mapred.AvroJob;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.writable.TupleWritable;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Partitioner;
-
-/**
- * Utilities that are useful in joining multiple data sets via a MapReduce.
- * 
- */
-public class JoinUtils {
-
-  public static Class<? extends Partitioner> getPartitionerClass(PTypeFamily typeFamily) {
-    if (typeFamily == WritableTypeFamily.getInstance()) {
-      return TupleWritablePartitioner.class;
-    } else {
-      return AvroIndexedRecordPartitioner.class;
-    }
-  }
-
-  public static Class<? extends RawComparator> getGroupingComparator(PTypeFamily typeFamily) {
-    if (typeFamily == WritableTypeFamily.getInstance()) {
-      return TupleWritableComparator.class;
-    } else {
-      return AvroPairGroupingComparator.class;
-    }
-  }
-
-  public static class TupleWritablePartitioner extends Partitioner<TupleWritable, Writable> {
-    @Override
-    public int getPartition(TupleWritable key, Writable value, int numPartitions) {
-      return (Math.abs(key.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions;
-    }
-  }
-
-  public static class TupleWritableComparator implements RawComparator<TupleWritable> {
-
-    private DataInputBuffer buffer = new DataInputBuffer();
-    private TupleWritable key1 = new TupleWritable();
-    private TupleWritable key2 = new TupleWritable();
-
-    @Override
-    public int compare(TupleWritable o1, TupleWritable o2) {
-      return ((WritableComparable) o1.get(0)).compareTo((WritableComparable) o2.get(0));
-    }
-
-    @Override
-    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-      try {
-        buffer.reset(b1, s1, l1);
-        key1.readFields(buffer);
-
-        buffer.reset(b2, s2, l2);
-        key2.readFields(buffer);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-
-      return compare(key1, key2);
-    }
-  }
-
-  public static class AvroIndexedRecordPartitioner<K, V> extends Partitioner<AvroKey<K>, AvroValue<V>> {
-    @Override
-    public int getPartition(AvroKey<K> key, AvroValue<V> value, int numPartitions) {
-      IndexedRecord record = (IndexedRecord) key.datum();
-      return (Math.abs(record.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions;
-    }
-  }
-
-  public static class AvroPairGroupingComparator<T> extends Configured implements RawComparator<AvroWrapper<T>> {
-    private Schema schema;
-
-    @Override
-    public void setConf(Configuration conf) {
-      super.setConf(conf);
-      if (conf != null) {
-        Schema mapOutputSchema = AvroJob.getMapOutputSchema(conf);
-        Schema keySchema = org.apache.avro.mapred.Pair.getKeySchema(mapOutputSchema);
-        schema = keySchema.getFields().get(0).schema();
-      }
-    }
-
-    @Override
-    public int compare(AvroWrapper<T> x, AvroWrapper<T> y) {
-      return ReflectData.get().compare(x.datum(), y.datum(), schema);
-    }
-
-    @Override
-    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-      return BinaryData.compare(b1, s1, l1, b2, s2, l2, schema);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
deleted file mode 100644
index 731c496..0000000
--- a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import java.util.List;
-
-import org.apache.crunch.Emitter;
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.PType;
-
-import com.google.common.collect.Lists;
-
-/**
- * Used to perform the last step of an left outer join.
- * 
- * @param <K> Type of the keys.
- * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
- */
-public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
-
-  private transient int lastId;
-  private transient K lastKey;
-  private transient List<U> leftValues;
-
-  public LeftOuterJoinFn(PType<K> keyType, PType<U> leftValueType) {
-    super(keyType, leftValueType);
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void initialize() {
-    super.initialize();
-    lastId = 1;
-    lastKey = null;
-    this.leftValues = Lists.newArrayList();
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) {
-    if (!key.equals(lastKey)) {
-      // Make sure that left side always gets emitted.
-      if (0 == lastId) {
-        for (U u : leftValues) {
-          emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
-        }
-      }
-      lastKey = keyType.getDetachedValue(key);
-      leftValues.clear();
-    }
-    if (id == 0) {
-      for (Pair<U, V> pair : pairs) {
-        if (pair.first() != null)
-          leftValues.add(leftValueType.getDetachedValue(pair.first()));
-      }
-    } else {
-      for (Pair<U, V> pair : pairs) {
-        for (U u : leftValues) {
-          emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
-        }
-      }
-    }
-
-    lastId = id;
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void cleanup(Emitter<Pair<K, Pair<U, V>>> emitter) {
-    if (0 == lastId) {
-      for (U u : leftValues) {
-        emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
-      }
-    }
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public String getJoinType() {
-    return "leftOuterJoin";
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java b/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
deleted file mode 100644
index 56476c1..0000000
--- a/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import java.io.IOException;
-
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.ParallelDoOptions;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.crunch.materialize.MaterializableIterable;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.util.DistCache;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-
-/**
- * Utility for doing map side joins on a common key between two {@link PTable}s.
- * <p>
- * A map side join is an optimized join which doesn't use a reducer; instead,
- * the right side of the join is loaded into memory and the join is performed in
- * a mapper. This style of join has the important implication that the output of
- * the join is not sorted, which is the case with a conventional (reducer-based)
- * join.
- * <p>
- * <b>Note:</b>This utility is only supported when running with a
- * {@link MRPipeline} as the pipeline.
- */
-public class MapsideJoin {
-
-  /**
-   * Join two tables using a map side join. The right-side table will be loaded
-   * fully in memory, so this method should only be used if the right side
-   * table's contents can fit in the memory allocated to mappers. The join
-   * performed by this method is an inner join.
-   * 
-   * @param left
-   *          The left-side table of the join
-   * @param right
-   *          The right-side table of the join, whose contents will be fully
-   *          read into memory
-   * @return A table keyed on the join key, containing pairs of joined values
-   */
-  public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right) {
-    PTypeFamily tf = left.getTypeFamily();
-    Iterable<Pair<K, V>> iterable = right.materialize();
-
-    if (iterable instanceof MaterializableIterable) {
-      MaterializableIterable<Pair<K, V>> mi = (MaterializableIterable<Pair<K, V>>) iterable;
-      MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(mi.getPath().toString(),
-          right.getPType());
-      ParallelDoOptions.Builder optionsBuilder = ParallelDoOptions.builder();
-      if (mi.isSourceTarget()) {
-        optionsBuilder.sourceTargets((SourceTarget) mi.getSource());
-      }
-      return left.parallelDo("mapjoin", mapJoinDoFn,
-          tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())),
-          optionsBuilder.build());
-    } else { // in-memory pipeline
-      return left.parallelDo(new InMemoryJoinFn<K, U, V>(iterable),
-          tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())));
-    }
-  }
-
-  static class InMemoryJoinFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
-
-    private Multimap<K, V> joinMap;
-    
-    public InMemoryJoinFn(Iterable<Pair<K, V>> iterable) {
-      joinMap = HashMultimap.create();
-      for (Pair<K, V> joinPair : iterable) {
-        joinMap.put(joinPair.first(), joinPair.second());
-      }
-    }
-    
-    @Override
-    public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>> emitter) {
-      K key = input.first();
-      U value = input.second();
-      for (V joinValue : joinMap.get(key)) {
-        Pair<U, V> valuePair = Pair.of(value, joinValue);
-        emitter.emit(Pair.of(key, valuePair));
-      }
-    }
-  }
-  
-  static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
-
-    private String inputPath;
-    private PType<Pair<K, V>> ptype;
-    private Multimap<K, V> joinMap;
-
-    public MapsideJoinDoFn(String inputPath, PType<Pair<K, V>> ptype) {
-      this.inputPath = inputPath;
-      this.ptype = ptype;
-    }
-
-    private Path getCacheFilePath() {
-      Path local = DistCache.getPathToCacheFile(new Path(inputPath), getConfiguration());
-      if (local == null) {
-        throw new CrunchRuntimeException("Can't find local cache file for '" + inputPath + "'");
-      }
-      return local;
-    }
-
-    @Override
-    public void configure(Configuration conf) {
-      DistCache.addCacheFile(new Path(inputPath), conf);
-    }
-    
-    @Override
-    public void initialize() {
-      super.initialize();
-
-      ReadableSourceTarget<Pair<K, V>> sourceTarget = ptype.getDefaultFileSource(
-          getCacheFilePath());
-      Iterable<Pair<K, V>> iterable = null;
-      try {
-        iterable = sourceTarget.read(getConfiguration());
-      } catch (IOException e) {
-        throw new CrunchRuntimeException("Error reading right-side of map side join: ", e);
-      }
-
-      joinMap = ArrayListMultimap.create();
-      for (Pair<K, V> joinPair : iterable) {
-        joinMap.put(joinPair.first(), joinPair.second());
-      }
-    }
-
-    @Override
-    public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>> emitter) {
-      K key = input.first();
-      U value = input.second();
-      for (V joinValue : joinMap.get(key)) {
-        Pair<U, V> valuePair = Pair.of(value, joinValue);
-        emitter.emit(Pair.of(key, valuePair));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
deleted file mode 100644
index 2789d40..0000000
--- a/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import java.util.List;
-
-import org.apache.crunch.Emitter;
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.PType;
-
-import com.google.common.collect.Lists;
-
-/**
- * Used to perform the last step of an right outer join.
- * 
- * @param <K> Type of the keys.
- * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
- */
-public class RightOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
-
-  private transient K lastKey;
-  private transient List<U> leftValues;
-
-  public RightOuterJoinFn(PType<K> keyType, PType<U> leftValueType) {
-    super(keyType, leftValueType);
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void initialize() {
-    super.initialize();
-    lastKey = null;
-    this.leftValues = Lists.newArrayList();
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) {
-    if (!key.equals(lastKey)) {
-      lastKey = keyType.getDetachedValue(key);
-      leftValues.clear();
-    }
-    if (id == 0) {
-      for (Pair<U, V> pair : pairs) {
-        if (pair.first() != null)
-          leftValues.add(leftValueType.getDetachedValue(pair.first()));
-      }
-    } else {
-      for (Pair<U, V> pair : pairs) {
-        // Make sure that right side gets emitted.
-        if (leftValues.isEmpty()) {
-          leftValues.add(null);
-        }
-
-        for (U u : leftValues) {
-          emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
-        }
-      }
-    }
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public String getJoinType() {
-    return "rightOuterJoin";
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/lib/join/package-info.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/package-info.java b/crunch/src/main/java/org/apache/crunch/lib/join/package-info.java
deleted file mode 100644
index f1ad9f1..0000000
--- a/crunch/src/main/java/org/apache/crunch/lib/join/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Inner and outer joins on collections.
- */
-package org.apache.crunch.lib.join;

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/lib/package-info.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/package-info.java b/crunch/src/main/java/org/apache/crunch/lib/package-info.java
deleted file mode 100644
index 2695787..0000000
--- a/crunch/src/main/java/org/apache/crunch/lib/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Joining, sorting, aggregating, and other commonly used functionality.
- */
-package org.apache.crunch.lib;

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/lib/sort/Comparators.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/sort/Comparators.java b/crunch/src/main/java/org/apache/crunch/lib/sort/Comparators.java
deleted file mode 100644
index ae7f49a..0000000
--- a/crunch/src/main/java/org/apache/crunch/lib/sort/Comparators.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.sort;
-
-import java.util.Arrays;
-
-import org.apache.avro.Schema;
-import org.apache.avro.io.BinaryData;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.crunch.lib.Sort.ColumnOrder;
-import org.apache.crunch.lib.Sort.Order;
-import org.apache.crunch.types.writable.TupleWritable;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Iterables;
-
-/**
- * A collection of {@code RawComparator<T>} implementations that are used by Crunch's {@code Sort} library.
- */
-public class Comparators {
-  
-  public static class ReverseWritableComparator<T> extends Configured implements RawComparator<T> {
-
-    private RawComparator<T> comparator;
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void setConf(Configuration conf) {
-      super.setConf(conf);
-      if (conf != null) {
-        JobConf jobConf = new JobConf(conf);
-        comparator = WritableComparator.get(jobConf.getMapOutputKeyClass().asSubclass(WritableComparable.class));
-      }
-    }
-
-    @Override
-    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
-      return -comparator.compare(arg0, arg1, arg2, arg3, arg4, arg5);
-    }
-
-    @Override
-    public int compare(T o1, T o2) {
-      return -comparator.compare(o1, o2);
-    }
-  }
-
-  public static class ReverseAvroComparator<T> extends Configured implements RawComparator<AvroKey<T>> {
-
-    private Schema schema;
-
-    @Override
-    public void setConf(Configuration conf) {
-      super.setConf(conf);
-      if (conf != null) {
-        schema = (new Schema.Parser()).parse(conf.get("crunch.schema"));
-      }
-    }
-
-    @Override
-    public int compare(AvroKey<T> o1, AvroKey<T> o2) {
-      return -ReflectData.get().compare(o1.datum(), o2.datum(), schema);
-    }
-
-    @Override
-    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
-      return -BinaryData.compare(arg0, arg1, arg2, arg3, arg4, arg5, schema);
-    }
-  }
-
-  public static class TupleWritableComparator extends WritableComparator implements Configurable {
-
-    private static final String CRUNCH_ORDERING_PROPERTY = "crunch.ordering";
-
-    private Configuration conf;
-    private ColumnOrder[] columnOrders;
-
-    public TupleWritableComparator() {
-      super(TupleWritable.class, true);
-    }
-
-    public static void configureOrdering(Configuration conf, Order... orders) {
-      conf.set(CRUNCH_ORDERING_PROPERTY,
-          Joiner.on(",").join(Iterables.transform(Arrays.asList(orders), new Function<Order, String>() {
-            @Override
-            public String apply(Order o) {
-              return o.name();
-            }
-          })));
-    }
-
-    public static void configureOrdering(Configuration conf, ColumnOrder... columnOrders) {
-      conf.set(CRUNCH_ORDERING_PROPERTY,
-          Joiner.on(",").join(Iterables.transform(Arrays.asList(columnOrders), new Function<ColumnOrder, String>() {
-            @Override
-            public String apply(ColumnOrder o) {
-              return o.column() + ";" + o.order().name();
-            }
-          })));
-    }
-
-    @Override
-    public int compare(WritableComparable a, WritableComparable b) {
-      TupleWritable ta = (TupleWritable) a;
-      TupleWritable tb = (TupleWritable) b;
-      for (int index = 0; index < columnOrders.length; index++) {
-        int order = 1;
-        if (columnOrders[index].order() == Order.ASCENDING) {
-          order = 1;
-        } else if (columnOrders[index].order() == Order.DESCENDING) {
-          order = -1;
-        } else { // ignore
-          continue;
-        }
-        if (!ta.has(index) && !tb.has(index)) {
-          continue;
-        } else if (ta.has(index) && !tb.has(index)) {
-          return order;
-        } else if (!ta.has(index) && tb.has(index)) {
-          return -order;
-        } else {
-          Writable v1 = ta.get(index);
-          Writable v2 = tb.get(index);
-          if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
-            if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
-              int cmp = ((WritableComparable) v1).compareTo((WritableComparable) v2);
-              if (cmp != 0) {
-                return order * cmp;
-              }
-            } else {
-              int cmp = v1.hashCode() - v2.hashCode();
-              if (cmp != 0) {
-                return order * cmp;
-              }
-            }
-          }
-        }
-      }
-      return 0; // ordering using specified cols found no differences
-    }
-
-    @Override
-    public Configuration getConf() {
-      return conf;
-    }
-
-    @Override
-    public void setConf(Configuration conf) {
-      this.conf = conf;
-      if (conf != null) {
-        String ordering = conf.get(CRUNCH_ORDERING_PROPERTY);
-        String[] columnOrderNames = ordering.split(",");
-        columnOrders = new ColumnOrder[columnOrderNames.length];
-        for (int i = 0; i < columnOrders.length; i++) {
-          String[] split = columnOrderNames[i].split(";");
-          int column = Integer.parseInt(split[0]);
-          Order order = Order.valueOf(split[1]);
-          columnOrders[i] = ColumnOrder.by(column, order);
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/lib/sort/SortFns.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/sort/SortFns.java b/crunch/src/main/java/org/apache/crunch/lib/sort/SortFns.java
deleted file mode 100644
index be218f6..0000000
--- a/crunch/src/main/java/org/apache/crunch/lib/sort/SortFns.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.sort;
-
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.lib.Sort.ColumnOrder;
-import org.apache.crunch.lib.Sort.Order;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.TupleFactory;
-import org.apache.crunch.types.avro.AvroType;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.avro.Avros;
-
-import com.google.common.collect.Lists;
-
-/**
- * A set of {@code DoFn}s that are used by Crunch's {@code Sort} library.
- */
-public class SortFns {
-
-  /**
-   * Extracts a single indexed key from a {@code Tuple} instance.
-   */
-  public static class SingleKeyFn<V extends Tuple, K> extends MapFn<V, K> {
-    private final int index;
-    
-    public SingleKeyFn(int index) {
-      this.index = index;
-    }
-
-    @Override
-    public K map(V input) {
-      return (K) input.get(index);
-    }
-  }
-
-  /**
-   * Extracts a composite key from a {@code Tuple} instance.
-   */
-  public static class TupleKeyFn<V extends Tuple, K extends Tuple> extends MapFn<V, K> {
-    private final int[] indices;
-    private final TupleFactory tupleFactory;
-    
-    public TupleKeyFn(int[] indices, TupleFactory tupleFactory) {
-      this.indices = indices;
-      this.tupleFactory = tupleFactory;
-    }
-    
-    @Override
-    public K map(V input) {
-      Object[] values = new Object[indices.length];
-      for (int i = 0; i < indices.length; i++) {
-        values[i] = input.get(indices[i]);
-      }
-      return (K) tupleFactory.makeTuple(values);
-    }
-  }
-  
-  /**
-   * Pulls a composite set of keys from an Avro {@code GenericRecord} instance.
-   */
-  public static class AvroGenericFn<V extends Tuple> extends MapFn<V, GenericRecord> {
-
-    private final int[] indices;
-    private final String schemaJson;
-    private transient Schema schema;
-    
-    public AvroGenericFn(int[] indices, Schema schema) {
-      this.indices = indices;
-      this.schemaJson = schema.toString();
-    }
-    
-    @Override
-    public void initialize() {
-      this.schema = (new Schema.Parser()).parse(schemaJson);
-    }
-    
-    @Override
-    public GenericRecord map(V input) {
-      GenericRecord rec = new GenericData.Record(schema);
-      for (int i = 0; i < indices.length; i++) {
-        rec.put(i, input.get(indices[i]));
-      }
-      return rec;
-    }
-  }
-  
-  /**
-   * Constructs an Avro schema for the given {@code PType<S>} that respects the given column
-   * orderings.
-   */
-  public static <S> Schema createOrderedTupleSchema(PType<S> ptype, ColumnOrder[] orders) {
-    // Guarantee each tuple schema has a globally unique name
-    String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
-    Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
-    List<Schema.Field> fields = Lists.newArrayList();
-    AvroType<S> parentAvroType = (AvroType<S>) ptype;
-    Schema parentAvroSchema = parentAvroType.getSchema();
-
-    for (int index = 0; index < orders.length; index++) {
-      ColumnOrder columnOrder = orders[index];
-      AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(index);
-      Schema fieldSchema = atype.getSchema();
-      String fieldName = parentAvroSchema.getFields().get(index).name();
-      // Note: avro sorting of strings is inverted relative to how sorting works for WritableComparable
-      // Text instances: making this consistent
-      Schema.Field.Order order = columnOrder.order() == Order.DESCENDING ? Schema.Field.Order.DESCENDING :
-        Schema.Field.Order.ASCENDING;
-      fields.add(new Schema.Field(fieldName, fieldSchema, "", null, order));
-    }
-    schema.setFields(fields);
-    return schema;
-  }
-
-  /**
-   * Utility class for encapsulating key extraction logic and serialization information about
-   * key extraction.
-   */
-  public static class KeyExtraction<V extends Tuple> {
-
-    private PType<V> ptype;
-    private final ColumnOrder[] columnOrder;
-    private final int[] cols;
-    
-    private MapFn<V, Object> byFn;
-    private PType<Object> keyPType;
-    
-    public KeyExtraction(PType<V> ptype, ColumnOrder[] columnOrder) {
-      this.ptype = ptype;
-      this.columnOrder = columnOrder;
-      this.cols = new int[columnOrder.length];
-      for (int i = 0; i < columnOrder.length; i++) {
-        cols[i] = columnOrder[i].column() - 1;
-      }
-      init();
-    }
-    
-    private void init() {
-      List<PType> pt = ptype.getSubTypes();
-      PTypeFamily ptf = ptype.getFamily();
-      if (cols.length == 1) {
-        byFn = new SingleKeyFn(cols[0]);
-        keyPType = pt.get(cols[0]);
-      } else {
-        TupleFactory tf = null;
-        switch (cols.length) {
-        case 2:
-          tf = TupleFactory.PAIR;
-          keyPType = ptf.pairs(pt.get(cols[0]), pt.get(cols[1]));
-          break;
-        case 3:
-          tf = TupleFactory.TUPLE3;
-          keyPType = ptf.triples(pt.get(cols[0]), pt.get(cols[1]), pt.get(cols[2]));
-          break;
-        case 4:
-          tf = TupleFactory.TUPLE4;
-          keyPType = ptf.quads(pt.get(cols[0]), pt.get(cols[1]), pt.get(cols[2]), pt.get(cols[3]));
-          break;
-        default:
-          PType[] pts = new PType[cols.length];
-          for (int i = 0; i < pts.length; i++) {
-            pts[i] = pt.get(cols[i]);
-          }
-          tf = TupleFactory.TUPLEN;
-          keyPType = (PType<Object>) (PType<?>) ptf.tuples(pts);
-        }
-        
-        if (ptf == AvroTypeFamily.getInstance()) {
-          Schema s = createOrderedTupleSchema(keyPType, columnOrder);
-          keyPType = (PType<Object>) (PType<?>) Avros.generics(s);
-          byFn = new AvroGenericFn(cols, s);
-        } else {
-          byFn = new TupleKeyFn(cols, tf);
-        }
-      }
-      
-    }
-
-    public MapFn<V, Object> getByFn() {
-      return byFn;
-    }
-    
-    public PType<Object> getKeyType() {
-      return keyPType;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java b/crunch/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
deleted file mode 100644
index 94fbdbe..0000000
--- a/crunch/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.sort;
-
-import java.io.IOException;
-import java.lang.reflect.Array;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-
-import org.apache.avro.Schema;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.crunch.io.CompositePathIterable;
-import org.apache.crunch.io.avro.AvroFileReaderFactory;
-import org.apache.crunch.io.seq.SeqFileReaderFactory;
-import org.apache.crunch.types.writable.WritableDeepCopier;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Partitioner;
-
-/**
- * A partition-aware {@code Partitioner} instance that can work with either Avro or Writable-formatted
- * keys.
- */
-public class TotalOrderPartitioner<K, V> extends Partitioner<K, V> implements Configurable {
-
-  public static final String DEFAULT_PATH = "_partition.lst";
-  public static final String PARTITIONER_PATH = 
-    "crunch.totalorderpartitioner.path";
-  
-  private Configuration conf;
-  private Node<K> partitions;
-  
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    try {
-      this.conf = conf;
-      String parts = getPartitionFile(conf);
-      final Path partFile = new Path(parts);
-      final FileSystem fs = (DEFAULT_PATH.equals(parts))
-        ? FileSystem.getLocal(conf)     // assume in DistributedCache
-        : partFile.getFileSystem(conf);
-
-      Job job = new Job(conf);
-      Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
-      RawComparator<K> comparator =
-          (RawComparator<K>) job.getSortComparator();
-      K[] splitPoints = readPartitions(fs, partFile, keyClass, conf, comparator);
-      int numReduceTasks = job.getNumReduceTasks();
-      if (splitPoints.length != numReduceTasks - 1) {
-        throw new IOException("Wrong number of partitions in keyset");
-      }
-      partitions = new BinarySearchNode(splitPoints, comparator);
-    } catch (IOException e) {
-      throw new IllegalArgumentException("Can't read partitions file", e);
-    }
-  }
-
-  @Override
-  public int getPartition(K key, V value, int modulo) {
-    return partitions.findPartition(key);
-  }
-
-  public static void setPartitionFile(Configuration conf, Path p) {
-    conf.set(PARTITIONER_PATH, p.toString());
-  }
-
-  public static String getPartitionFile(Configuration conf) {
-    return conf.get(PARTITIONER_PATH, DEFAULT_PATH);
-  }
-  
-  @SuppressWarnings("unchecked") // map output key class
-  private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
-      Configuration conf, final RawComparator<K> comparator) throws IOException {
-    ArrayList<K> parts = new ArrayList<K>();
-    String schema = conf.get("crunch.schema");
-    if (schema != null) {
-      Schema s = (new Schema.Parser()).parse(schema);
-      AvroFileReaderFactory<K> a = new AvroFileReaderFactory<K>(s);
-      Iterator<K> iter = CompositePathIterable.create(fs, p, a).iterator();
-      while (iter.hasNext()) {
-        parts.add((K) new AvroKey<K>(iter.next()));
-      }
-    } else {
-      WritableDeepCopier wdc = new WritableDeepCopier(keyClass);
-      SeqFileReaderFactory<K> s = new SeqFileReaderFactory<K>(keyClass);
-      Iterator<K> iter = CompositePathIterable.create(fs, p, s).iterator();
-      while (iter.hasNext()) {
-        parts.add((K) wdc.deepCopy((Writable) iter.next()));
-      }
-    }
-    Collections.sort(parts, comparator);
-    return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
-  }
-  
-  /**
-   * Interface to the partitioner to locate a key in the partition keyset.
-   */
-  interface Node<T> {
-    /**
-     * Locate partition in keyset K, st [Ki..Ki+1) defines a partition,
-     * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1.
-     */
-    int findPartition(T key);
-  }
-  
-  class BinarySearchNode implements Node<K> {
-    private final K[] splitPoints;
-    private final RawComparator<K> comparator;
-    BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
-      this.splitPoints = splitPoints;
-      this.comparator = comparator;
-    }
-    public int findPartition(K key) {
-      final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
-      return (pos < 0) ? -pos : pos;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java b/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
deleted file mode 100644
index 2dcc64f..0000000
--- a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.materialize;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.io.PathTarget;
-import org.apache.crunch.io.ReadableSource;
-import org.apache.crunch.io.impl.FileSourceImpl;
-import org.apache.hadoop.fs.Path;
-
-public class MaterializableIterable<E> implements Iterable<E> {
-
-  private static final Log LOG = LogFactory.getLog(MaterializableIterable.class);
-
-  private final Pipeline pipeline;
-  private final ReadableSource<E> source;
-  private Iterable<E> materialized;
-
-  public MaterializableIterable(Pipeline pipeline, ReadableSource<E> source) {
-    this.pipeline = pipeline;
-    this.source = source;
-    this.materialized = null;
-  }
-
-  public ReadableSource<E> getSource() {
-    return source;
-  }
-
-  public boolean isSourceTarget() {
-    return (source instanceof SourceTarget);
-  }
-  
-  public Path getPath() {
-    if (source instanceof FileSourceImpl) {
-      return ((FileSourceImpl) source).getPath();
-    } else if (source instanceof PathTarget) {
-      return ((PathTarget) source).getPath();
-    }
-    return null;
-  }
-  
-  @Override
-  public Iterator<E> iterator() {
-    if (materialized == null) {
-      pipeline.run();
-      materialize();
-    }
-    return materialized.iterator();
-  }
-
-  public void materialize() {
-    try {
-      materialized = source.read(pipeline.getConfiguration());
-    } catch (IOException e) {
-      LOG.error("Could not materialize: " + source, e);
-      throw new CrunchRuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/materialize/MaterializableMap.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableMap.java b/crunch/src/main/java/org/apache/crunch/materialize/MaterializableMap.java
deleted file mode 100644
index 69082e2..0000000
--- a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableMap.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.materialize;
-
-import java.util.AbstractMap;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.crunch.Pair;
-
-public class MaterializableMap<K, V> extends AbstractMap<K, V> {
-
-  private Iterable<Pair<K, V>> iterable;
-  private Set<Map.Entry<K, V>> entrySet;
-
-  public MaterializableMap(Iterable<Pair<K, V>> iterable) {
-    this.iterable = iterable;
-  }
-
-  private Set<Map.Entry<K, V>> toMapEntries(Iterable<Pair<K, V>> xs) {
-    HashMap<K, V> m = new HashMap<K, V>();
-    for (Pair<K, V> x : xs)
-      m.put(x.first(), x.second());
-    return m.entrySet();
-  }
-
-  @Override
-  public Set<Map.Entry<K, V>> entrySet() {
-    if (entrySet == null)
-      entrySet = toMapEntries(iterable);
-    return entrySet;
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/materialize/pobject/CollectionPObject.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/materialize/pobject/CollectionPObject.java b/crunch/src/main/java/org/apache/crunch/materialize/pobject/CollectionPObject.java
deleted file mode 100644
index 60e64b1..0000000
--- a/crunch/src/main/java/org/apache/crunch/materialize/pobject/CollectionPObject.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.materialize.pobject;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.crunch.PCollection;
-
-/**
- * A concrete implementation of {@link org.apache.crunch.materialize.pobject.PObjectImpl} whose
- * value is a Java {@link java.util.Collection} containing the elements of the underlying {@link
- * PCollection} for this {@link org.apache.crunch.PObject}.
- *
- * @param <S> The value type for elements contained in the {@code Collection} value encapsulated
- * by this {@code PObject}.
- */
-public class CollectionPObject<S> extends PObjectImpl<S, Collection<S>> {
-
-  /**
-   * Constructs a new instance of this {@code PObject} implementation.
-   *
-   * @param collect The backing {@code PCollection} for this {@code PObject}.
-   */
-  public CollectionPObject(PCollection<S> collect) {
-    super(collect);
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public Collection<S> process(Iterable<S> input) {
-    Collection<S> target = new ArrayList<S>();
-    Iterator<S> itr = input.iterator();
-    while (itr.hasNext()) {
-      target.add(itr.next());
-    }
-    return target;
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java b/crunch/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java
deleted file mode 100644
index aa5fd9e..0000000
--- a/crunch/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.materialize.pobject;
-
-import java.util.Iterator;
-
-import org.apache.crunch.PCollection;
-
-/**
- * A concrete implementation of {@link PObjectImpl} that uses the first element in the backing
- * {@link PCollection} as the {@link org.apache.crunch.PObject} value.
- *
- * @param <T> The value type of this {@code PObject}.
- */
-public class FirstElementPObject<T> extends PObjectImpl<T, T> {
-
-  /**
-   * Constructs a new instance of this {@code PObject} implementation.
-   *
-   * @param collect The backing {@code PCollection} for this {@code PObject}.
-   */
-  public FirstElementPObject(PCollection<T> collect) {
-    super(collect);
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public T process(Iterable<T> input) {
-    Iterator<T> itr = input.iterator();
-    if (itr.hasNext()) {
-      return itr.next();
-    }
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/materialize/pobject/MapPObject.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/materialize/pobject/MapPObject.java b/crunch/src/main/java/org/apache/crunch/materialize/pobject/MapPObject.java
deleted file mode 100644
index 243997f..0000000
--- a/crunch/src/main/java/org/apache/crunch/materialize/pobject/MapPObject.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.materialize.pobject;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.crunch.PCollection;
-import org.apache.crunch.Pair;
-
-/**
- * A concrete implementation of {@link PObjectImpl} whose
- * value is a Java {@link Map}. The underlying {@link PCollection} for this
- * {@link org.apache.crunch.PObject} must contain {@link Pair}s of values. The
- * first element of the pair will be used as the map key, while the second element will be used
- * as the map value.  Note that the contents of the underlying {@code PCollection} may not be
- * reflected in the returned {@code Map}, since a single key may be mapped to several values in
- * the underlying {@code PCollection}, and only one of those values will appear in the {@code
- * Map} encapsulated by this {@code PObject}.
- *
- * @param <K> The type of keys for the Map.
- * @param <V> The type of values for the Map.
- */
-public class MapPObject<K, V> extends PObjectImpl<Pair<K, V>, Map<K, V>> {
-
-  /**
-   * Constructs a new instance of this {@code PObject} implementation.
-   *
-   * @param collect The backing {@code PCollection} for this {@code PObject}.
-   */
-  public MapPObject(PCollection<Pair<K, V>> collect) {
-    super(collect);
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public Map<K, V> process(Iterable<Pair<K, V>> input) {
-    Map<K, V> target = new HashMap<K, V>();
-    Iterator<Pair<K, V>> itr = input.iterator();
-    while (itr.hasNext()) {
-      Pair<K, V> pair = itr.next();
-      target.put(pair.first(), pair.second());
-    }
-    return target;
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/materialize/pobject/PObjectImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/materialize/pobject/PObjectImpl.java b/crunch/src/main/java/org/apache/crunch/materialize/pobject/PObjectImpl.java
deleted file mode 100644
index 59c2ba2..0000000
--- a/crunch/src/main/java/org/apache/crunch/materialize/pobject/PObjectImpl.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.materialize.pobject;
-
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PObject;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.Target;
-
-/**
- * An abstract implementation of {@link PObject} that is backed by a {@link PCollection}.
- * Clients creating a concrete implementation should override the method
- * {@link PObjectImpl#process(Iterable)}, which transforms the backing PCollection into the
- * singleton value encapsulated by the PObject. Once this {code PObject}'s value has been
- * calculated, the value is cached to prevent subsequent materializations of the backing
- * {@code PCollection}.
- *
- * @param <S> The type contained in the underlying PCollection.
- * @param <T> The type encapsulated by this PObject.
- */
-public abstract class PObjectImpl<S, T> implements PObject<T> {
-
-  // The underlying PCollection whose contents will be used to generate the value for this
-  // PObject.
-  private PCollection<S> collection;
-
-  // A variable to hold a cached copy of the value of this {@code PObject},
-  // to prevent unnecessary materializations of the backing {@code PCollection}.
-  private T cachedValue;
-
-  // A flag indicating if a value for this {@code PObject} has been cached.
-  private boolean isCached;
-
-  /**
-   * Constructs a new instance of this {@code PObject} implementation.
-   *
-   * @param collect The backing {@code PCollection} for this {@code PObject}.
-   */
-  public PObjectImpl(PCollection<S> collect) {
-    this.collection = collect;
-    this.cachedValue = null;
-    this.isCached = false;
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public String toString() {
-    return collection.toString();
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public final T getValue() {
-    if (!isCached) {
-      cachedValue = process(collection.materialize());
-      isCached = true;
-    }
-    return cachedValue;
-  }
-
-  /**
-   * Transforms the provided Iterable, obtained from the backing {@link PCollection},
-   * into the value encapsulated by this {@code PObject}.
-   *
-   * @param input An Iterable whose elements correspond to those of the backing {@code
-   * PCollection}.
-   * @return The value of this {@code PObject}.
-   */
-  protected abstract T process(Iterable<S> input);
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/package-info.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/package-info.java b/crunch/src/main/java/org/apache/crunch/package-info.java
deleted file mode 100644
index 38f11bc..0000000
--- a/crunch/src/main/java/org/apache/crunch/package-info.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Client-facing API and core abstractions.
- *
- * @see <a href="http://crunch.apache.org/intro.html">Introduction to
- *      Apache Crunch</a>
- */
-package org.apache.crunch;

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java
deleted file mode 100644
index 151ab82..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.Lists;
-
-/**
- * Performs deep copies (based on underlying PType deep copying) of Collections.
- * 
- * @param <T> The type of Tuple implementation being copied
- */
-public class CollectionDeepCopier<T> implements DeepCopier<Collection<T>> {
-
-  private PType<T> elementType;
-
-  public CollectionDeepCopier(PType<T> elementType) {
-    this.elementType = elementType;
-  }
-
-  @Override
-  public void initialize(Configuration conf) {
-    this.elementType.initialize(conf);
-  }
-
-  @Override
-  public Collection<T> deepCopy(Collection<T> source) {
-    if (source == null) {
-      return null;
-    }
-    List<T> copiedCollection = Lists.newArrayListWithCapacity(source.size());
-    for (T value : source) {
-      copiedCollection.add(elementType.getDetachedValue(value));
-    }
-    return copiedCollection;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/Converter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/Converter.java b/crunch/src/main/java/org/apache/crunch/types/Converter.java
deleted file mode 100644
index a0dbb16..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/Converter.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.io.Serializable;
-
-import org.apache.crunch.DoFn;
-
-/**
- * Converts the input key/value from a MapReduce task into the input to a
- * {@link DoFn}, or takes the output of a {@code DoFn} and write it to the
- * output key/values.
- */
-public interface Converter<K, V, S, T> extends Serializable {
-  S convertInput(K key, V value);
-
-  T convertIterableInput(K key, Iterable<V> value);
-
-  K outputKey(S value);
-
-  V outputValue(S value);
-
-  Class<K> getKeyClass();
-
-  Class<V> getValueClass();
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
deleted file mode 100644
index f146e86..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.io.Serializable;
-
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Performs deep copies of values.
- * 
- * @param <T> The type of value that will be copied
- */
-public interface DeepCopier<T> extends Serializable {
-
-  /**
-   * Initialize the deep copier with a job-specific configuration
-   * 
-   * @param conf Job-specific configuration
-   */
-  void initialize(Configuration conf);
-
-  /**
-   * Create a deep copy of a value.
-   * 
-   * @param source The value to be copied
-   * @return The deep copy of the value
-   */
-  T deepCopy(T source);
-
-  static class NoOpDeepCopier<V> implements DeepCopier<V> {
-
-    @Override
-    public V deepCopy(V source) {
-      return source;
-    }
-
-    @Override
-    public void initialize(Configuration conf) {
-      // No initialization needed
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/MapDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/MapDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/MapDeepCopier.java
deleted file mode 100644
index de8903b..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/MapDeepCopier.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.Maps;
-
-public class MapDeepCopier<T> implements DeepCopier<Map<String, T>> {
-
-  private final PType<T> ptype;
-
-  public MapDeepCopier(PType<T> ptype) {
-    this.ptype = ptype;
-  }
-
-  @Override
-  public void initialize(Configuration conf) {
-    this.ptype.initialize(conf);
-  }
-
-  @Override
-  public Map<String, T> deepCopy(Map<String, T> source) {
-    if (source == null) {
-      return null;
-    }
-    
-    Map<String, T> deepCopyMap = Maps.newHashMap();
-    for (Entry<String, T> entry : source.entrySet()) {
-      deepCopyMap.put(entry.getKey(), ptype.getDetachedValue(entry.getValue()));
-    }
-    return deepCopyMap;
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
deleted file mode 100644
index d276cd6..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PGroupedTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-import com.google.common.collect.Iterables;
-
-/**
- * The {@code PType} instance for {@link PGroupedTable} instances. Its settings
- * are derived from the {@code PTableType} that was grouped to create the
- * {@code PGroupedTable} instance.
- * 
- */
-public abstract class PGroupedTableType<K, V> implements PType<Pair<K, Iterable<V>>> {
-
-  protected static class PTypeIterable<V> implements Iterable<V> {
-    private final Iterable<Object> iterable;
-    private final MapFn<Object, V> mapFn;
-
-    public PTypeIterable(MapFn<Object, V> mapFn, Iterable<Object> iterable) {
-      this.mapFn = mapFn;
-      this.iterable = iterable;
-    }
-
-    public Iterator<V> iterator() {
-      return new Iterator<V>() {
-        Iterator<Object> iter = iterable.iterator();
-
-        public boolean hasNext() {
-          return iter.hasNext();
-        }
-
-        public V next() {
-          return mapFn.map(iter.next());
-        }
-
-        public void remove() {
-          iter.remove();
-        }
-      };
-    }
-    
-    @Override
-    public String toString() {
-      return Iterables.toString(this);
-    }
-  }
-
-  public static class PairIterableMapFn<K, V> extends MapFn<Pair<Object, Iterable<Object>>, Pair<K, Iterable<V>>> {
-    private final MapFn<Object, K> keys;
-    private final MapFn<Object, V> values;
-
-    public PairIterableMapFn(MapFn<Object, K> keys, MapFn<Object, V> values) {
-      this.keys = keys;
-      this.values = values;
-    }
-
-    @Override
-    public void configure(Configuration conf) {
-      keys.configure(conf);
-      values.configure(conf);
-    }
-    
-    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
-      keys.setContext(context);
-      values.setContext(context);
-    }
-    
-    @Override
-    public void initialize() {
-      keys.initialize();
-      values.initialize();
-    }
-
-    @Override
-    public Pair<K, Iterable<V>> map(Pair<Object, Iterable<Object>> input) {
-      return Pair.<K, Iterable<V>> of(keys.map(input.first()), new PTypeIterable(values, input.second()));
-    }
-  }
-
-  protected final PTableType<K, V> tableType;
-
-  public PGroupedTableType(PTableType<K, V> tableType) {
-    this.tableType = tableType;
-  }
-
-  public PTableType<K, V> getTableType() {
-    return tableType;
-  }
-
-  @Override
-  public PTypeFamily getFamily() {
-    return tableType.getFamily();
-  }
-
-  @Override
-  public List<PType> getSubTypes() {
-    return tableType.getSubTypes();
-  }
-
-  @Override
-  public Converter getConverter() {
-    return tableType.getConverter();
-  }
-
-  public abstract Converter getGroupingConverter();
-
-  public abstract void configureShuffle(Job job, GroupingOptions options);
-
-  @Override
-  public ReadableSourceTarget<Pair<K, Iterable<V>>> getDefaultFileSource(Path path) {
-    throw new UnsupportedOperationException("Grouped tables cannot be written out directly");
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/PTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PTableType.java b/crunch/src/main/java/org/apache/crunch/types/PTableType.java
deleted file mode 100644
index 3d06f8b..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/PTableType.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-
-/**
- * An extension of {@code PType} specifically for {@link PTable} objects. It
- * allows separate access to the {@code PType}s of the key and value for the
- * {@code PTable}.
- * 
- */
-public interface PTableType<K, V> extends PType<Pair<K, V>> {
-  /**
-   * Returns the key type for the table.
-   */
-  PType<K> getKeyType();
-
-  /**
-   * Returns the value type for the table.
-   */
-  PType<V> getValueType();
-
-  /**
-   * Returns the grouped table version of this type.
-   */
-  PGroupedTableType<K, V> getGroupedTableType();
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/PType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PType.java b/crunch/src/main/java/org/apache/crunch/types/PType.java
deleted file mode 100644
index ebddf84..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/PType.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-/**
- * A {@code PType} defines a mapping between a data type that is used in a Crunch pipeline and a
- * serialization and storage format that is used to read/write data from/to HDFS. Every
- * {@link PCollection} has an associated {@code PType} that tells Crunch how to read/write data from
- * that {@code PCollection}.
- * 
- */
-public interface PType<T> extends Serializable {
-  /**
-   * Returns the Java type represented by this {@code PType}.
-   */
-  Class<T> getTypeClass();
-
-  /**
-   * Returns the {@code PTypeFamily} that this {@code PType} belongs to.
-   */
-  PTypeFamily getFamily();
-
-  MapFn<Object, T> getInputMapFn();
-
-  MapFn<T, Object> getOutputMapFn();
-
-  Converter getConverter();
-
-  /**
-   * Initialize this PType for use within a DoFn. This generally only needs to be called when using
-   * a PType for {@link #getDetachedValue(Object)}.
-   * 
-   * @param conf Configuration object
-   * @see PType#getDetachedValue(Object)
-   */
-  void initialize(Configuration conf);
-
-  /**
-   * Returns a copy of a value (or the value itself) that can safely be retained.
-   * <p>
-   * This is useful when iterable values being processed in a DoFn (via a reducer) need to be held
-   * on to for more than the scope of a single iteration, as a reducer (and therefore also a DoFn
-   * that has an Iterable as input) re-use deserialized values. More information on object reuse is
-   * available in the {@link DoFn} class documentation.
-   * 
-   * @param value The value to be deep-copied
-   * @return A deep copy of the input value
-   */
-  T getDetachedValue(T value);
-
-  /**
-   * Returns a {@code SourceTarget} that is able to read/write data using the serialization format
-   * specified by this {@code PType}.
-   */
-  ReadableSourceTarget<T> getDefaultFileSource(Path path);
-
-  /**
-   * Returns the sub-types that make up this PType if it is a composite instance, such as a tuple.
-   */
-  List<PType> getSubTypes();
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/PTypeFamily.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PTypeFamily.java b/crunch/src/main/java/org/apache/crunch/types/PTypeFamily.java
deleted file mode 100644
index 9458f14..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/PTypeFamily.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-
-/**
- * An abstract factory for creating {@code PType} instances that have the same
- * serialization/storage backing format.
- * 
- */
-public interface PTypeFamily {
-  PType<Void> nulls();
-
-  PType<String> strings();
-
-  PType<Long> longs();
-
-  PType<Integer> ints();
-
-  PType<Float> floats();
-
-  PType<Double> doubles();
-
-  PType<Boolean> booleans();
-
-  PType<ByteBuffer> bytes();
-
-  <T> PType<T> records(Class<T> clazz);
-
-  <T> PType<Collection<T>> collections(PType<T> ptype);
-
-  <T> PType<Map<String, T>> maps(PType<T> ptype);
-
-  <V1, V2> PType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2);
-
-  <V1, V2, V3> PType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3);
-
-  <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3, PType<V4> p4);
-
-  PType<TupleN> tuples(PType<?>... ptypes);
-
-  <T extends Tuple> PType<T> tuples(Class<T> clazz, PType<?>... ptypes);
-
-  <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base);
-
-  <K, V> PTableType<K, V> tableOf(PType<K> key, PType<V> value);
-
-  /**
-   * Returns the equivalent of the given ptype for this family, if it exists.
-   */
-  <T> PType<T> as(PType<T> ptype);
-}