You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/05/22 17:47:54 UTC

git commit: CRUNCH-208: Add mapValues convenience functions for PTable and PGroupedTable as well as a mapKeys function for PTable. Deprecate the MapKeysFn and MapValuesFn in favor of these new methods.

Updated Branches:
  refs/heads/master fc2d5782a -> b24dc5804


CRUNCH-208: Add mapValues convenience functions for PTable and PGroupedTable as well
as a mapKeys function for PTable. Deprecate the MapKeysFn and MapValuesFn in favor
of these new methods.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b24dc580
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b24dc580
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b24dc580

Branch: refs/heads/master
Commit: b24dc58044f655e456400d91fd10513954b7f654
Parents: fc2d578
Author: Josh Wills <jw...@apache.org>
Authored: Mon May 20 00:01:17 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Mon May 20 11:40:23 2013 -0700

----------------------------------------------------------------------
 .../src/it/java/org/apache/crunch/PageRankIT.java  |   22 ++--
 .../src/it/java/org/apache/crunch/TfIdfIT.java     |   24 ++--
 .../main/java/org/apache/crunch/PGroupedTable.java |   35 +++++-
 .../src/main/java/org/apache/crunch/PTable.java    |   24 ++++
 .../main/java/org/apache/crunch/fn/MapKeysFn.java  |    3 +
 .../java/org/apache/crunch/fn/MapValuesFn.java     |    3 +
 .../crunch/impl/mem/collect/MemGroupedTable.java   |   20 +++-
 .../apache/crunch/impl/mem/collect/MemTable.java   |   21 +++
 .../crunch/impl/mr/collect/DoCollectionImpl.java   |    3 -
 .../crunch/impl/mr/collect/PGroupedTableImpl.java  |   18 +++
 .../apache/crunch/impl/mr/collect/PTableBase.java  |   21 +++
 .../main/java/org/apache/crunch/lib/PTables.java   |   99 +++++++++++++++
 12 files changed, 263 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
index 6291ef8..23c71b3 100644
--- a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
@@ -119,19 +119,19 @@ public class PageRankIT {
       }
     }, ptf.tableOf(ptf.strings(), ptf.floats()));
 
-    return input.cogroup(outbound).parallelDo(
-        new MapFn<Pair<String, Pair<Collection<PageRankData>, Collection<Float>>>, Pair<String, PageRankData>>() {
+    return input.cogroup(outbound).mapValues(
+        new MapFn<Pair<Collection<PageRankData>, Collection<Float>>, PageRankData>() {
           @Override
-          public Pair<String, PageRankData> map(Pair<String, Pair<Collection<PageRankData>, Collection<Float>>> input) {
-            PageRankData prd = Iterables.getOnlyElement(input.second().first());
-            Collection<Float> propagatedScores = input.second().second();
+          public PageRankData map(Pair<Collection<PageRankData>, Collection<Float>> input) {
+            PageRankData prd = Iterables.getOnlyElement(input.first());
+            Collection<Float> propagatedScores = input.second();
             float sum = 0.0f;
             for (Float s : propagatedScores) {
               sum += s;
             }
-            return Pair.of(input.first(), prd.next(d + (1.0f - d) * sum));
+            return prd.next(d + (1.0f - d) * sum);
           }
-        }, input.getPTableType());
+        }, input.getValueType());
   }
 
   public static void run(Pipeline pipeline, String urlInput,
@@ -144,12 +144,12 @@ public class PageRankIT {
             return Pair.of(urls[0], urls[1]);
           }
         }, ptf.tableOf(ptf.strings(), ptf.strings())).groupByKey()
-        .parallelDo(new MapFn<Pair<String, Iterable<String>>, Pair<String, PageRankData>>() {
+        .mapValues(new MapFn<Iterable<String>, PageRankData>() {
           @Override
-          public Pair<String, PageRankData> map(Pair<String, Iterable<String>> input) {
-            return Pair.of(input.first(), new PageRankData(1.0f, 0.0f, input.second()));
+          public PageRankData map(Iterable<String> input) {
+            return new PageRankData(1.0f, 0.0f, input);
           }
-        }, ptf.tableOf(ptf.strings(), prType));
+        }, prType);
 
     Float delta = 1.0f;
     while (delta > 0.01) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java b/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java
index 218f538..23e45ca 100644
--- a/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java
@@ -152,24 +152,22 @@ public class TfIdfIT implements Serializable {
      * Collection<Pair<title, tfidf>>>
      */
     return joinedResults
-        .parallelDo(
-            "calculate tfidf",
-            new MapFn<Pair<String, Pair<Long, Collection<Pair<String, Long>>>>, Pair<String, Collection<Pair<String, Double>>>>() {
+        .mapValues(
+            new MapFn<Pair<Long, Collection<Pair<String, Long>>>, Collection<Pair<String, Double>>>() {
               @Override
-              public Pair<String, Collection<Pair<String, Double>>> map(
-                  Pair<String, Pair<Long, Collection<Pair<String, Long>>>> input) {
+              public Collection<Pair<String, Double>> map(
+                  Pair<Long, Collection<Pair<String, Long>>> input) {
                 Collection<Pair<String, Double>> tfidfs = Lists.newArrayList();
-                String word = input.first();
-                double n = input.second().first();
+                double n = input.first();
                 double idf = Math.log(N / n);
-                for (Pair<String, Long> tf : input.second().second()) {
+                for (Pair<String, Long> tf : input.second()) {
                   double tfidf = tf.second() * idf;
                   tfidfs.add(Pair.of(tf.first(), tfidf));
                 }
-                return Pair.of(word, tfidfs);
+                return tfidfs;
               }
 
-            }, ptf.tableOf(ptf.strings(), ptf.collections(ptf.pairs(ptf.strings(), ptf.doubles()))));
+            }, ptf.collections(ptf.pairs(ptf.strings(), ptf.doubles())));
   }
 
   public void run(Pipeline pipeline, PTypeFamily typeFamily, boolean singleRun) throws IOException {
@@ -187,13 +185,13 @@ public class TfIdfIT implements Serializable {
       pipeline.run();
     }
 
-    PTable<String, Collection<Pair<String, Double>>> uppercased = results.parallelDo(
-        new MapKeysFn<String, String, Collection<Pair<String, Double>>>() {
+    PTable<String, Collection<Pair<String, Double>>> uppercased = results.mapKeys(
+        new MapFn<String, String>() {
           @Override
           public String map(String k1) {
             return k1.toUpperCase();
           }
-        }, results.getPTableType());
+        }, results.getKeyType());
     pipeline.writeTextFile(uppercased, outputPath2);
     pipeline.done();
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
index d77ffdb..68085c6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
@@ -18,10 +18,12 @@
 package org.apache.crunch;
 
 import org.apache.crunch.Aggregator;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.crunch.types.PType;
 
 /**
- * The Crunch representation of a grouped {@link PTable}.
- * 
+ * The Crunch representation of a grouped {@link PTable}, which corresponds to the output of
+ * the shuffle phase of a MapReduce job.
  */
 public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> {
 
@@ -45,9 +47,38 @@ public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> {
   PTable<K, V> combineValues(Aggregator<V> aggregator);
 
   /**
+   * Maps the {@code Iterable<V>} elements of each record to a new type. Just like
+   * any {@code parallelDo} operation on a {@code PGroupedTable}, this may only be
+   * called once.
+   * 
+   * @param mapFn The mapping function
+   * @param ptype The serialization information for the returned data
+   * @return A new {@code PTable} instance
+   */
+  <U> PTable<K, U> mapValues(MapFn<Iterable<V>, U> mapFn, PType<U> ptype);
+  
+  /**
+   * Maps the {@code Iterable<V>} elements of each record to a new type. Just like
+   * any {@code parallelDo} operation on a {@code PGroupedTable}, this may only be
+   * called once.
+   * 
+   * @param name A name for this operation
+   * @param mapFn The mapping function
+   * @param ptype The serialization information for the returned data
+   * @return A new {@code PTable} instance
+   */
+  <U> PTable<K, U> mapValues(String name, MapFn<Iterable<V>, U> mapFn, PType<U> ptype);
+
+  /**
    * Convert this grouping back into a multimap.
    * 
    * @return an ungrouped version of the data in this {@code PGroupedTable}.
    */
   PTable<K, V> ungroup();
+  
+  /**
+   * Return the {@code PGroupedTableType} containing serialization information for
+   * this {@code PGroupedTable}.
+   */
+  PGroupedTableType<K, V> getGroupedTableType();
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/PTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PTable.java b/crunch-core/src/main/java/org/apache/crunch/PTable.java
index 8df9853..738b3cb 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PTable.java
@@ -96,6 +96,30 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> {
   PType<V> getValueType();
 
   /**
+   * Returns a {@code PTable} that has the same keys as this instance, but
+   * uses the given function to map the values.
+   */
+  <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype);
+
+  /**
+   * Returns a {@code PTable} that has the same keys as this instance, but
+   * uses the given function to map the values.
+   */
+  <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype);
+ 
+  /**
+   * Returns a {@code PTable} that has the same values as this instance, but
+   * uses the given function to map the keys.
+   */
+  <K2> PTable<K2, V> mapKeys(MapFn<K, K2> mapFn, PType<K2> ptype);
+  
+  /**
+   * Returns a {@code PTable} that has the same values as this instance, but
+   * uses the given function to map the keys.
+   */
+  <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype);
+  
+  /**
    * Aggregate all of the values with the same key into a single key-value pair
    * in the returned PTable.
    */

http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java
index cbaf24d..1dd8130 100644
--- a/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java
@@ -21,6 +21,9 @@ import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.Pair;
 
+/**
+ * @deprecated Use {@link org.apache.crunch.PTable#mapKeys(org.apache.crunch.MapFn, org.apache.crunch.types.PType)}
+ */
 public abstract class MapKeysFn<K1, K2, V> extends DoFn<Pair<K1, V>, Pair<K2, V>> {
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java
index b90f5ff..9b171f4 100644
--- a/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java
@@ -21,6 +21,9 @@ import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.Pair;
 
+/**
+ * @deprecated Use {@link org.apache.crunch.PTable#mapValues(org.apache.crunch.MapFn, org.apache.crunch.types.PType)}
+ */
 public abstract class MapValuesFn<K, V1, V2> extends DoFn<Pair<K, V1>, Pair<K, V2>> {
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
index d105bb4..12c17b6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
@@ -25,6 +25,7 @@ import java.util.TreeMap;
 import org.apache.crunch.Aggregator;
 import org.apache.crunch.CombineFn;
 import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PGroupedTable;
 import org.apache.crunch.PTable;
@@ -32,6 +33,8 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.Target;
 import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.lib.PTables;
+import org.apache.crunch.types.PGroupedTableType;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
@@ -74,13 +77,18 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implemen
 
   @Override
   public PType<Pair<K, Iterable<V>>> getPType() {
+    return getGroupedTableType();
+  }
+
+  @Override
+  public PGroupedTableType<K, V> getGroupedTableType() {
     PTableType<K, V> parentType = parent.getPTableType();
     if (parentType != null) {
       return parentType.getGroupedTableType();
     }
     return null;
   }
-
+  
   @Override
   public PTypeFamily getTypeFamily() {
     return parent.getTypeFamily();
@@ -107,6 +115,16 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implemen
   }
 
   @Override
+  public <U> PTable<K, U> mapValues(MapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
+    return PTables.mapValues(this, mapFn, ptype);
+  }
+  
+  @Override
+  public <U> PTable<K, U> mapValues(String name, MapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
+    return PTables.mapValues(name, this, mapFn, ptype);
+  }
+  
+  @Override
   public PTable<K, V> ungroup() {
     return parent;
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
index f8a5960..99405e6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
@@ -23,6 +23,7 @@ import java.util.Map;
 
 import org.apache.crunch.FilterFn;
 import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PGroupedTable;
 import org.apache.crunch.PObject;
@@ -129,6 +130,26 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable<
   }
 
   @Override
+  public <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype) {
+    return PTables.mapValues(this, mapFn, ptype);
+  }
+  
+  @Override
+  public <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype) {
+    return PTables.mapValues(name, this, mapFn, ptype);
+  }
+  
+  @Override
+  public <K2> PTable<K2, V> mapKeys(MapFn<K, K2> mapFn, PType<K2> ptype) {
+    return PTables.mapKeys(this, mapFn, ptype);
+  }
+  
+  @Override
+  public <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype) {
+    return PTables.mapKeys(name, this, mapFn, ptype);
+  }
+  
+  @Override
   public PTable<K, V> top(int count) {
     return Aggregate.top(this, count, true);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
index 7b8f2ea..8881e3f 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
@@ -18,16 +18,13 @@
 package org.apache.crunch.impl.mr.collect;
 
 import java.util.List;
-import java.util.Set;
 
 import org.apache.crunch.DoFn;
 import org.apache.crunch.ParallelDoOptions;
-import org.apache.crunch.SourceTarget;
 import org.apache.crunch.impl.mr.plan.DoNode;
 import org.apache.crunch.types.PType;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 
 public class DoCollectionImpl<S> extends PCollectionImpl<S> {
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
index ccac5d5..d277b75 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
@@ -27,12 +27,14 @@ import org.apache.crunch.CombineFn;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
 import org.apache.crunch.PGroupedTable;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.fn.Aggregators;
 import org.apache.crunch.impl.mr.plan.DoNode;
+import org.apache.crunch.lib.PTables;
 import org.apache.crunch.types.PGroupedTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.util.PartitionUtils;
@@ -103,11 +105,27 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>
     }
   }
 
+  @Override
   public PTable<K, V> ungroup() {
     return parallelDo("ungroup", new Ungroup<K, V>(), parent.getPTableType());
   }
 
   @Override
+  public <U> PTable<K, U> mapValues(MapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
+    return PTables.mapValues(this, mapFn, ptype);
+  }
+  
+  @Override
+  public <U> PTable<K, U> mapValues(String name, MapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
+    return PTables.mapValues(name, this, mapFn, ptype);
+  }
+  
+  @Override
+  public PGroupedTableType<K, V> getGroupedTableType() {
+    return ptype;
+  }
+  
+  @Override
   protected void acceptInternal(PCollectionImpl.Visitor visitor) {
     visitor.visitGroupedTable(this);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
index 3c2393d..c477fad 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
@@ -23,6 +23,7 @@ import java.util.Map;
 
 import org.apache.crunch.FilterFn;
 import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PObject;
 import org.apache.crunch.PTable;
@@ -119,6 +120,26 @@ abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> implements P
   }
   
   @Override
+  public <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype) {
+    return PTables.mapValues(this, mapFn, ptype);
+  }
+  
+  @Override
+  public <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype) {
+    return PTables.mapValues(name, this, mapFn, ptype);
+  }
+  
+  @Override
+  public <K2> PTable<K2, V> mapKeys(MapFn<K, K2> mapFn, PType<K2> ptype) {
+    return PTables.mapKeys(this, mapFn, ptype);
+  }
+  
+  @Override
+  public <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype) {
+    return PTables.mapKeys(name, this, mapFn, ptype);
+  }
+  
+  @Override
   public PTable<K, V> top(int count) {
     return Aggregate.top(this, count, true);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java b/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java
index e907680..e0a3bf3 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java
@@ -21,11 +21,13 @@ import java.util.List;
 
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PGroupedTable;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.fn.PairMapFn;
 import org.apache.crunch.types.PGroupedTableType;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
@@ -51,6 +53,103 @@ public class PTables {
     DoFn<Pair<K, V>, Pair<K, V>> id = IdentityFn.getInstance();
     return pcollect.parallelDo("asPTable", id, ptt);
   }
+
+  /**
+   * Maps a {@code PTable<K1, V>} to a {@code PTable<K2, V>} using the given {@code MapFn<K1, K2>} on
+   * the keys of the {@code PTable}.
+   * 
+   * @param ptable The {@code PTable} to be mapped
+   * @param mapFn The mapping function
+   * @param ptype The PType for the returned keys
+   * @return A new {@code PTable<K2, V>} instance
+   */
+  public static <K1, K2, V> PTable<K2, V> mapKeys(PTable<K1, V> ptable, MapFn<K1, K2> mapFn,
+      PType<K2> ptype) {
+    return mapKeys("PTables.mapKeys", ptable, mapFn, ptype);
+  }
+  
+  /**
+   * Maps a {@code PTable<K1, V>} to a {@code PTable<K2, V>} using the given {@code MapFn<K1, K2>} on
+   * the keys of the {@code PTable}.
+   * 
+   * @param name The name of the transform
+   * @param ptable The {@code PTable} to be mapped
+   * @param mapFn The mapping function
+   * @param ptype The PType for the returned keys
+   * @return A new {@code PTable<K2, V>} instance
+   */
+  public static <K1, K2, V> PTable<K2, V> mapKeys(String name, PTable<K1, V> ptable, MapFn<K1, K2> mapFn,
+      PType<K2> ptype) {
+    PTypeFamily ptf = ptable.getTypeFamily();
+    return ptable.parallelDo(name,
+        new PairMapFn<K1, V, K2, V>(mapFn, IdentityFn.<V>getInstance()),
+        ptf.tableOf(ptype, ptable.getValueType()));
+  }
+  
+  /**
+   * Maps a {@code PTable<K, U>} to a {@code PTable<K, V>} using the given {@code MapFn<U, V>} on
+   * the values of the {@code PTable}.
+   * 
+   * @param ptable The {@code PTable} to be mapped
+   * @param mapFn The mapping function
+   * @param ptype The PType for the returned values
+   * @return A new {@code PTable<K, V>} instance
+   */
+  public static <K, U, V> PTable<K, V> mapValues(PTable<K, U> ptable, MapFn<U, V> mapFn,
+      PType<V> ptype) {
+    return mapValues("PTables.mapValues", ptable, mapFn, ptype);
+  }
+  
+  /**
+   * Maps a {@code PTable<K, U>} to a {@code PTable<K, V>} using the given {@code MapFn<U, V>} on
+   * the values of the {@code PTable}.
+   * 
+   * @param name The name of the transform
+   * @param ptable The {@code PTable} to be mapped
+   * @param mapFn The mapping function
+   * @param ptype The PType for the returned values
+   * @return A new {@code PTable<K, V>} instance
+   */
+  public static <K, U, V> PTable<K, V> mapValues(String name, PTable<K, U> ptable, MapFn<U, V> mapFn,
+      PType<V> ptype) {
+    PTypeFamily ptf = ptable.getTypeFamily();
+    return ptable.parallelDo(name,
+        new PairMapFn<K, U, K, V>(IdentityFn.<K>getInstance(), mapFn),
+        ptf.tableOf(ptable.getKeyType(), ptype));
+  }
+  
+  /**
+   * An analogue of the {@code mapValues} function for {@code PGroupedTable<K, U>} collections.
+   * 
+   * @param ptable The {@code PGroupedTable} to be mapped
+   * @param mapFn The mapping function
+   * @param ptype The PType for the returned values
+   * @return A new {@code PTable<K, V>} instance
+   */
+  public static <K, U, V> PTable<K, V> mapValues(PGroupedTable<K, U> ptable,
+      MapFn<Iterable<U>, V> mapFn,
+      PType<V> ptype) {
+    return mapValues("PTables.mapValues", ptable, mapFn, ptype);
+  }
+  
+  /**
+   * An analogue of the {@code mapValues} function for {@code PGroupedTable<K, U>} collections.
+   * 
+   * @param name The name of the operation
+   * @param ptable The {@code PGroupedTable} to be mapped
+   * @param mapFn The mapping function
+   * @param ptype The PType for the returned values
+   * @return A new {@code PTable<K, V>} instance
+   */
+  public static <K, U, V> PTable<K, V> mapValues(String name,
+      PGroupedTable<K, U> ptable,
+      MapFn<Iterable<U>, V> mapFn,
+      PType<V> ptype) {
+    PTypeFamily ptf = ptable.getTypeFamily();
+    return ptable.parallelDo(name,
+        new PairMapFn<K, Iterable<U>, K, V>(IdentityFn.<K>getInstance(), mapFn),
+        ptf.tableOf((PType<K>) ptable.getPType().getSubTypes().get(0), ptype));
+  }
   
   /**
    * Extract the keys from the given {@code PTable<K, V>} as a {@code PCollection<K>}.