You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/05/22 17:47:54 UTC
git commit: CRUNCH-208: Add mapValues convenience functions for
PTable and PGroupedTable as well as a mapKeys function for PTable. Deprecate
the MapKeysFn and MapValuesFn in favor of these new methods.
Updated Branches:
refs/heads/master fc2d5782a -> b24dc5804
CRUNCH-208: Add mapValues convenience functions for PTable and PGroupedTable as well
as a mapKeys function for PTable. Deprecate the MapKeysFn and MapValuesFn in favor
of these new methods.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b24dc580
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b24dc580
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b24dc580
Branch: refs/heads/master
Commit: b24dc58044f655e456400d91fd10513954b7f654
Parents: fc2d578
Author: Josh Wills <jw...@apache.org>
Authored: Mon May 20 00:01:17 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Mon May 20 11:40:23 2013 -0700
----------------------------------------------------------------------
.../src/it/java/org/apache/crunch/PageRankIT.java | 22 ++--
.../src/it/java/org/apache/crunch/TfIdfIT.java | 24 ++--
.../main/java/org/apache/crunch/PGroupedTable.java | 35 +++++-
.../src/main/java/org/apache/crunch/PTable.java | 24 ++++
.../main/java/org/apache/crunch/fn/MapKeysFn.java | 3 +
.../java/org/apache/crunch/fn/MapValuesFn.java | 3 +
.../crunch/impl/mem/collect/MemGroupedTable.java | 20 +++-
.../apache/crunch/impl/mem/collect/MemTable.java | 21 +++
.../crunch/impl/mr/collect/DoCollectionImpl.java | 3 -
.../crunch/impl/mr/collect/PGroupedTableImpl.java | 18 +++
.../apache/crunch/impl/mr/collect/PTableBase.java | 21 +++
.../main/java/org/apache/crunch/lib/PTables.java | 99 +++++++++++++++
12 files changed, 263 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
index 6291ef8..23c71b3 100644
--- a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
@@ -119,19 +119,19 @@ public class PageRankIT {
}
}, ptf.tableOf(ptf.strings(), ptf.floats()));
- return input.cogroup(outbound).parallelDo(
- new MapFn<Pair<String, Pair<Collection<PageRankData>, Collection<Float>>>, Pair<String, PageRankData>>() {
+ return input.cogroup(outbound).mapValues(
+ new MapFn<Pair<Collection<PageRankData>, Collection<Float>>, PageRankData>() {
@Override
- public Pair<String, PageRankData> map(Pair<String, Pair<Collection<PageRankData>, Collection<Float>>> input) {
- PageRankData prd = Iterables.getOnlyElement(input.second().first());
- Collection<Float> propagatedScores = input.second().second();
+ public PageRankData map(Pair<Collection<PageRankData>, Collection<Float>> input) {
+ PageRankData prd = Iterables.getOnlyElement(input.first());
+ Collection<Float> propagatedScores = input.second();
float sum = 0.0f;
for (Float s : propagatedScores) {
sum += s;
}
- return Pair.of(input.first(), prd.next(d + (1.0f - d) * sum));
+ return prd.next(d + (1.0f - d) * sum);
}
- }, input.getPTableType());
+ }, input.getValueType());
}
public static void run(Pipeline pipeline, String urlInput,
@@ -144,12 +144,12 @@ public class PageRankIT {
return Pair.of(urls[0], urls[1]);
}
}, ptf.tableOf(ptf.strings(), ptf.strings())).groupByKey()
- .parallelDo(new MapFn<Pair<String, Iterable<String>>, Pair<String, PageRankData>>() {
+ .mapValues(new MapFn<Iterable<String>, PageRankData>() {
@Override
- public Pair<String, PageRankData> map(Pair<String, Iterable<String>> input) {
- return Pair.of(input.first(), new PageRankData(1.0f, 0.0f, input.second()));
+ public PageRankData map(Iterable<String> input) {
+ return new PageRankData(1.0f, 0.0f, input);
}
- }, ptf.tableOf(ptf.strings(), prType));
+ }, prType);
Float delta = 1.0f;
while (delta > 0.01) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java b/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java
index 218f538..23e45ca 100644
--- a/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java
@@ -152,24 +152,22 @@ public class TfIdfIT implements Serializable {
* Collection<Pair<title, tfidf>>>
*/
return joinedResults
- .parallelDo(
- "calculate tfidf",
- new MapFn<Pair<String, Pair<Long, Collection<Pair<String, Long>>>>, Pair<String, Collection<Pair<String, Double>>>>() {
+ .mapValues(
+ new MapFn<Pair<Long, Collection<Pair<String, Long>>>, Collection<Pair<String, Double>>>() {
@Override
- public Pair<String, Collection<Pair<String, Double>>> map(
- Pair<String, Pair<Long, Collection<Pair<String, Long>>>> input) {
+ public Collection<Pair<String, Double>> map(
+ Pair<Long, Collection<Pair<String, Long>>> input) {
Collection<Pair<String, Double>> tfidfs = Lists.newArrayList();
- String word = input.first();
- double n = input.second().first();
+ double n = input.first();
double idf = Math.log(N / n);
- for (Pair<String, Long> tf : input.second().second()) {
+ for (Pair<String, Long> tf : input.second()) {
double tfidf = tf.second() * idf;
tfidfs.add(Pair.of(tf.first(), tfidf));
}
- return Pair.of(word, tfidfs);
+ return tfidfs;
}
- }, ptf.tableOf(ptf.strings(), ptf.collections(ptf.pairs(ptf.strings(), ptf.doubles()))));
+ }, ptf.collections(ptf.pairs(ptf.strings(), ptf.doubles())));
}
public void run(Pipeline pipeline, PTypeFamily typeFamily, boolean singleRun) throws IOException {
@@ -187,13 +185,13 @@ public class TfIdfIT implements Serializable {
pipeline.run();
}
- PTable<String, Collection<Pair<String, Double>>> uppercased = results.parallelDo(
- new MapKeysFn<String, String, Collection<Pair<String, Double>>>() {
+ PTable<String, Collection<Pair<String, Double>>> uppercased = results.mapKeys(
+ new MapFn<String, String>() {
@Override
public String map(String k1) {
return k1.toUpperCase();
}
- }, results.getPTableType());
+ }, results.getKeyType());
pipeline.writeTextFile(uppercased, outputPath2);
pipeline.done();
http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
index d77ffdb..68085c6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
@@ -18,10 +18,12 @@
package org.apache.crunch;
import org.apache.crunch.Aggregator;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.crunch.types.PType;
/**
- * The Crunch representation of a grouped {@link PTable}.
- *
+ * The Crunch representation of a grouped {@link PTable}, which corresponds to the output of
+ * the shuffle phase of a MapReduce job.
*/
public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> {
@@ -45,9 +47,38 @@ public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> {
PTable<K, V> combineValues(Aggregator<V> aggregator);
/**
+ * Maps the {@code Iterable<V>} elements of each record to a new type. Just like
+ * any {@code parallelDo} operation on a {@code PGroupedTable}, this may only be
+ * called once.
+ *
+ * @param mapFn The mapping function
+ * @param ptype The serialization information for the returned data
+ * @return A new {@code PTable} instance
+ */
+ <U> PTable<K, U> mapValues(MapFn<Iterable<V>, U> mapFn, PType<U> ptype);
+
+ /**
+ * Maps the {@code Iterable<V>} elements of each record to a new type. Just like
+ * any {@code parallelDo} operation on a {@code PGroupedTable}, this may only be
+ * called once.
+ *
+ * @param name A name for this operation
+ * @param mapFn The mapping function
+ * @param ptype The serialization information for the returned data
+ * @return A new {@code PTable} instance
+ */
+ <U> PTable<K, U> mapValues(String name, MapFn<Iterable<V>, U> mapFn, PType<U> ptype);
+
+ /**
* Convert this grouping back into a multimap.
*
* @return an ungrouped version of the data in this {@code PGroupedTable}.
*/
PTable<K, V> ungroup();
+
+ /**
+ * Return the {@code PGroupedTableType} containing serialization information for
+ * this {@code PGroupedTable}.
+ */
+ PGroupedTableType<K, V> getGroupedTableType();
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/PTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PTable.java b/crunch-core/src/main/java/org/apache/crunch/PTable.java
index 8df9853..738b3cb 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PTable.java
@@ -96,6 +96,30 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> {
PType<V> getValueType();
/**
+ * Returns a {@code PTable} that has the same keys as this instance, but
+ * uses the given function to map the values.
+ */
+ <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype);
+
+ /**
+ * Returns a {@code PTable} that has the same keys as this instance, but
+ * uses the given function to map the values.
+ */
+ <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype);
+
+ /**
+ * Returns a {@code PTable} that has the same values as this instance, but
+ * uses the given function to map the keys.
+ */
+ <K2> PTable<K2, V> mapKeys(MapFn<K, K2> mapFn, PType<K2> ptype);
+
+ /**
+ * Returns a {@code PTable} that has the same values as this instance, but
+ * uses the given function to map the keys.
+ */
+ <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype);
+
+ /**
* Aggregate all of the values with the same key into a single key-value pair
* in the returned PTable.
*/
http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java
index cbaf24d..1dd8130 100644
--- a/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java
@@ -21,6 +21,9 @@ import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.Pair;
+/**
+ * @deprecated Use {@link org.apache.crunch.PTable#mapKeys(org.apache.crunch.MapFn, org.apache.crunch.types.PType)}
+ */
public abstract class MapKeysFn<K1, K2, V> extends DoFn<Pair<K1, V>, Pair<K2, V>> {
@Override
http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java
index b90f5ff..9b171f4 100644
--- a/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java
@@ -21,6 +21,9 @@ import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.Pair;
+/**
+ * @deprecated Use {@link org.apache.crunch.PTable#mapValues(org.apache.crunch.MapFn, org.apache.crunch.types.PType)}
+ */
public abstract class MapValuesFn<K, V1, V2> extends DoFn<Pair<K, V1>, Pair<K, V2>> {
@Override
http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
index d105bb4..12c17b6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
@@ -25,6 +25,7 @@ import java.util.TreeMap;
import org.apache.crunch.Aggregator;
import org.apache.crunch.CombineFn;
import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PGroupedTable;
import org.apache.crunch.PTable;
@@ -32,6 +33,8 @@ import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.Target;
import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.lib.PTables;
+import org.apache.crunch.types.PGroupedTableType;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
@@ -74,13 +77,18 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implemen
@Override
public PType<Pair<K, Iterable<V>>> getPType() {
+ return getGroupedTableType();
+ }
+
+ @Override
+ public PGroupedTableType<K, V> getGroupedTableType() {
PTableType<K, V> parentType = parent.getPTableType();
if (parentType != null) {
return parentType.getGroupedTableType();
}
return null;
}
-
+
@Override
public PTypeFamily getTypeFamily() {
return parent.getTypeFamily();
@@ -107,6 +115,16 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implemen
}
@Override
+ public <U> PTable<K, U> mapValues(MapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
+ return PTables.mapValues(this, mapFn, ptype);
+ }
+
+ @Override
+ public <U> PTable<K, U> mapValues(String name, MapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
+ return PTables.mapValues(name, this, mapFn, ptype);
+ }
+
+ @Override
public PTable<K, V> ungroup() {
return parent;
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
index f8a5960..99405e6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.crunch.FilterFn;
import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PGroupedTable;
import org.apache.crunch.PObject;
@@ -129,6 +130,26 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable<
}
@Override
+ public <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype) {
+ return PTables.mapValues(this, mapFn, ptype);
+ }
+
+ @Override
+ public <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype) {
+ return PTables.mapValues(name, this, mapFn, ptype);
+ }
+
+ @Override
+ public <K2> PTable<K2, V> mapKeys(MapFn<K, K2> mapFn, PType<K2> ptype) {
+ return PTables.mapKeys(this, mapFn, ptype);
+ }
+
+ @Override
+ public <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype) {
+ return PTables.mapKeys(name, this, mapFn, ptype);
+ }
+
+ @Override
public PTable<K, V> top(int count) {
return Aggregate.top(this, count, true);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
index 7b8f2ea..8881e3f 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
@@ -18,16 +18,13 @@
package org.apache.crunch.impl.mr.collect;
import java.util.List;
-import java.util.Set;
import org.apache.crunch.DoFn;
import org.apache.crunch.ParallelDoOptions;
-import org.apache.crunch.SourceTarget;
import org.apache.crunch.impl.mr.plan.DoNode;
import org.apache.crunch.types.PType;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
public class DoCollectionImpl<S> extends PCollectionImpl<S> {
http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
index ccac5d5..d277b75 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
@@ -27,12 +27,14 @@ import org.apache.crunch.CombineFn;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
import org.apache.crunch.PGroupedTable;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.fn.Aggregators;
import org.apache.crunch.impl.mr.plan.DoNode;
+import org.apache.crunch.lib.PTables;
import org.apache.crunch.types.PGroupedTableType;
import org.apache.crunch.types.PType;
import org.apache.crunch.util.PartitionUtils;
@@ -103,11 +105,27 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>
}
}
+ @Override
public PTable<K, V> ungroup() {
return parallelDo("ungroup", new Ungroup<K, V>(), parent.getPTableType());
}
@Override
+ public <U> PTable<K, U> mapValues(MapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
+ return PTables.mapValues(this, mapFn, ptype);
+ }
+
+ @Override
+ public <U> PTable<K, U> mapValues(String name, MapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
+ return PTables.mapValues(name, this, mapFn, ptype);
+ }
+
+ @Override
+ public PGroupedTableType<K, V> getGroupedTableType() {
+ return ptype;
+ }
+
+ @Override
protected void acceptInternal(PCollectionImpl.Visitor visitor) {
visitor.visitGroupedTable(this);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
index 3c2393d..c477fad 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.crunch.FilterFn;
import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PObject;
import org.apache.crunch.PTable;
@@ -119,6 +120,26 @@ abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> implements P
}
@Override
+ public <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype) {
+ return PTables.mapValues(this, mapFn, ptype);
+ }
+
+ @Override
+ public <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype) {
+ return PTables.mapValues(name, this, mapFn, ptype);
+ }
+
+ @Override
+ public <K2> PTable<K2, V> mapKeys(MapFn<K, K2> mapFn, PType<K2> ptype) {
+ return PTables.mapKeys(this, mapFn, ptype);
+ }
+
+ @Override
+ public <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype) {
+ return PTables.mapKeys(name, this, mapFn, ptype);
+ }
+
+ @Override
public PTable<K, V> top(int count) {
return Aggregate.top(this, count, true);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java b/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java
index e907680..e0a3bf3 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java
@@ -21,11 +21,13 @@ import java.util.List;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PGroupedTable;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.fn.PairMapFn;
import org.apache.crunch.types.PGroupedTableType;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
@@ -51,6 +53,103 @@ public class PTables {
DoFn<Pair<K, V>, Pair<K, V>> id = IdentityFn.getInstance();
return pcollect.parallelDo("asPTable", id, ptt);
}
+
+ /**
+ * Maps a {@code PTable<K1, V>} to a {@code PTable<K2, V>} using the given {@code MapFn<K1, K2>} on
+ * the keys of the {@code PTable}.
+ *
+ * @param ptable The {@code PTable} to be mapped
+ * @param mapFn The mapping function
+ * @param ptype The PType for the returned keys
+ * @return A new {@code PTable<K2, V>} instance
+ */
+ public static <K1, K2, V> PTable<K2, V> mapKeys(PTable<K1, V> ptable, MapFn<K1, K2> mapFn,
+ PType<K2> ptype) {
+ return mapKeys("PTables.mapKeys", ptable, mapFn, ptype);
+ }
+
+ /**
+ * Maps a {@code PTable<K1, V>} to a {@code PTable<K2, V>} using the given {@code MapFn<K1, K2>} on
+ * the keys of the {@code PTable}.
+ *
+ * @param name The name of the transform
+ * @param ptable The {@code PTable} to be mapped
+ * @param mapFn The mapping function
+ * @param ptype The PType for the returned keys
+ * @return A new {@code PTable<K2, V>} instance
+ */
+ public static <K1, K2, V> PTable<K2, V> mapKeys(String name, PTable<K1, V> ptable, MapFn<K1, K2> mapFn,
+ PType<K2> ptype) {
+ PTypeFamily ptf = ptable.getTypeFamily();
+ return ptable.parallelDo(name,
+ new PairMapFn<K1, V, K2, V>(mapFn, IdentityFn.<V>getInstance()),
+ ptf.tableOf(ptype, ptable.getValueType()));
+ }
+
+ /**
+ * Maps a {@code PTable<K, U>} to a {@code PTable<K, V>} using the given {@code MapFn<U, V>} on
+ * the values of the {@code PTable}.
+ *
+ * @param ptable The {@code PTable} to be mapped
+ * @param mapFn The mapping function
+ * @param ptype The PType for the returned values
+ * @return A new {@code PTable<K, V>} instance
+ */
+ public static <K, U, V> PTable<K, V> mapValues(PTable<K, U> ptable, MapFn<U, V> mapFn,
+ PType<V> ptype) {
+ return mapValues("PTables.mapValues", ptable, mapFn, ptype);
+ }
+
+ /**
+ * Maps a {@code PTable<K, U>} to a {@code PTable<K, V>} using the given {@code MapFn<U, V>} on
+ * the values of the {@code PTable}.
+ *
+ * @param name The name of the transform
+ * @param ptable The {@code PTable} to be mapped
+ * @param mapFn The mapping function
+ * @param ptype The PType for the returned values
+ * @return A new {@code PTable<K, V>} instance
+ */
+ public static <K, U, V> PTable<K, V> mapValues(String name, PTable<K, U> ptable, MapFn<U, V> mapFn,
+ PType<V> ptype) {
+ PTypeFamily ptf = ptable.getTypeFamily();
+ return ptable.parallelDo(name,
+ new PairMapFn<K, U, K, V>(IdentityFn.<K>getInstance(), mapFn),
+ ptf.tableOf(ptable.getKeyType(), ptype));
+ }
+
+ /**
+ * An analogue of the {@code mapValues} function for {@code PGroupedTable<K, U>} collections.
+ *
+ * @param ptable The {@code PGroupedTable} to be mapped
+ * @param mapFn The mapping function
+ * @param ptype The PType for the returned values
+ * @return A new {@code PTable<K, V>} instance
+ */
+ public static <K, U, V> PTable<K, V> mapValues(PGroupedTable<K, U> ptable,
+ MapFn<Iterable<U>, V> mapFn,
+ PType<V> ptype) {
+ return mapValues("PTables.mapValues", ptable, mapFn, ptype);
+ }
+
+ /**
+ * An analogue of the {@code mapValues} function for {@code PGroupedTable<K, U>} collections.
+ *
+ * @param name The name of the operation
+ * @param ptable The {@code PGroupedTable} to be mapped
+ * @param mapFn The mapping function
+ * @param ptype The PType for the returned values
+ * @return A new {@code PTable<K, V>} instance
+ */
+ public static <K, U, V> PTable<K, V> mapValues(String name,
+ PGroupedTable<K, U> ptable,
+ MapFn<Iterable<U>, V> mapFn,
+ PType<V> ptype) {
+ PTypeFamily ptf = ptable.getTypeFamily();
+ return ptable.parallelDo(name,
+ new PairMapFn<K, Iterable<U>, K, V>(IdentityFn.<K>getInstance(), mapFn),
+ ptf.tableOf((PType<K>) ptable.getPType().getSubTypes().get(0), ptype));
+ }
/**
* Extract the keys from the given {@code PTable<K, V>} as a {@code PCollection<K>}.