You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/01/29 22:16:37 UTC
git commit: CRUNCH-150: Crunch lib function for converting
PCollection> to a PTable.
Updated Branches:
refs/heads/master 9a1c42760 -> 035b1b91d
CRUNCH-150: Crunch lib function for converting PCollection<Pair<K, V>> to a PTable<K, V>.
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/035b1b91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/035b1b91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/035b1b91
Branch: refs/heads/master
Commit: 035b1b91d60c1ed5029135d73706ffd54b184a8c
Parents: 9a1c427
Author: Josh Wills <jw...@apache.org>
Authored: Thu Jan 24 14:55:26 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Tue Jan 29 12:24:29 2013 -0800
----------------------------------------------------------------------
.../main/java/org/apache/crunch/lib/Distinct.java | 15 +++++
.../main/java/org/apache/crunch/lib/PTables.java | 25 ++++++++
.../main/java/org/apache/crunch/lib/Sample.java | 47 ++++++++++++---
3 files changed, 79 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/035b1b91/crunch/src/main/java/org/apache/crunch/lib/Distinct.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Distinct.java b/crunch/src/main/java/org/apache/crunch/lib/Distinct.java
index 15f7205..533f3fb 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Distinct.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Distinct.java
@@ -22,6 +22,7 @@ import java.util.Set;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
@@ -48,6 +49,13 @@ public final class Distinct {
}
/**
+ * A {@code PTable<K, V>} analogue of the {@code distinct} function.
+ */
+ public static <K, V> PTable<K, V> distinct(PTable<K, V> input) {
+ return PTables.asPTable(distinct((PCollection<Pair<K, V>>) input));
+ }
+
+ /**
* A {@code distinct} operation that gives the client more control over how frequently
* elements are flushed to disk in order to allow control over performance or
* memory consumption.
@@ -66,6 +74,13 @@ public final class Distinct {
.parallelDo("post-distinct", new PostDistinctFn<S>(), pt);
}
+ /**
+ * A {@code PTable<K, V>} analogue of the {@code distinct} function.
+ */
+ public static <K, V> PTable<K, V> distinct(PTable<K, V> input, int flushEvery) {
+ return PTables.asPTable(distinct((PCollection<Pair<K, V>>) input, flushEvery));
+ }
+
private static class PreDistinctFn<S> extends DoFn<S, Pair<S, Void>> {
private final Set<S> values = Sets.newHashSet();
private final int flushEvery;
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/035b1b91/crunch/src/main/java/org/apache/crunch/lib/PTables.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/PTables.java b/crunch/src/main/java/org/apache/crunch/lib/PTables.java
index e788656..e907680 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/PTables.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/PTables.java
@@ -25,9 +25,11 @@ import org.apache.crunch.PCollection;
import org.apache.crunch.PGroupedTable;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
+import org.apache.crunch.fn.IdentityFn;
import org.apache.crunch.types.PGroupedTableType;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
import com.google.common.collect.Lists;
@@ -37,6 +39,24 @@ import com.google.common.collect.Lists;
*/
public class PTables {
+ /**
+ * Convert the given {@code PCollection<Pair<K, V>>} to a {@code PTable<K, V>}.
+ * @param pcollect The {@code PCollection} to convert
+ * @return A {@code PTable} that contains the same data as the input {@code PCollection}
+ */
+ public static <K, V> PTable<K, V> asPTable(PCollection<Pair<K, V>> pcollect) {
+ PType<Pair<K, V>> pt = pcollect.getPType();
+ PTypeFamily ptf = pt.getFamily();
+ PTableType<K, V> ptt = ptf.tableOf(pt.getSubTypes().get(0), pt.getSubTypes().get(1));
+ DoFn<Pair<K, V>, Pair<K, V>> id = IdentityFn.getInstance();
+ return pcollect.parallelDo("asPTable", id, ptt);
+ }
+
+ /**
+ * Extract the keys from the given {@code PTable<K, V>} as a {@code PCollection<K>}.
+ * @param ptable The {@code PTable}
+ * @return A {@code PCollection<K>}
+ */
public static <K, V> PCollection<K> keys(PTable<K, V> ptable) {
return ptable.parallelDo("PTables.keys", new DoFn<Pair<K, V>, K>() {
@Override
@@ -46,6 +66,11 @@ public class PTables {
}, ptable.getKeyType());
}
+ /**
+ * Extract the values from the given {@code PTable<K, V>} as a {@code PCollection<V>}.
+ * @param ptable The {@code PTable}
+ * @return A {@code PCollection<V>}
+ */
public static <K, V> PCollection<V> values(PTable<K, V> ptable) {
return ptable.parallelDo("PTables.values", new DoFn<Pair<K, V>, V>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/035b1b91/crunch/src/main/java/org/apache/crunch/lib/Sample.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Sample.java b/crunch/src/main/java/org/apache/crunch/lib/Sample.java
index 54f8731..5be2292 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Sample.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Sample.java
@@ -19,15 +19,16 @@ package org.apache.crunch.lib;
import java.util.Random;
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
+import org.apache.crunch.FilterFn;
import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
import com.google.common.base.Preconditions;
public class Sample {
- private static class SamplerFn<S> extends DoFn<S, S> {
+ private static class SamplerFn<S> extends FilterFn<S> {
private final long seed;
private final double acceptanceProbability;
@@ -41,23 +42,53 @@ public class Sample {
@Override
public void initialize() {
- r = new Random(seed);
+ if (r == null) {
+ r = new Random(seed);
+ }
}
@Override
- public void process(S input, Emitter<S> emitter) {
- if (r.nextDouble() < acceptanceProbability) {
- emitter.emit(input);
- }
+ public boolean accept(S input) {
+ return r.nextDouble() < acceptanceProbability;
}
}
+ /**
+ * Output records from the given {@code PCollection} with the given probability.
+ *
+ * @param input The {@code PCollection} to sample from
+ * @param probability The probability (0.0 < p < 1.0)
+ * @return The output {@code PCollection} created from sampling
+ */
public static <S> PCollection<S> sample(PCollection<S> input, double probability) {
return sample(input, System.currentTimeMillis(), probability);
}
+ /**
+ * Output records from the given {@code PCollection} using a given seed. Useful for unit
+ * testing.
+ *
+ * @param input The {@code PCollection} to sample from
+ * @param seed The seed
+ * @param probability The probability (0.0 < p < 1.0)
+ * @return The output {@code PCollection} created from sampling
+ */
public static <S> PCollection<S> sample(PCollection<S> input, long seed, double probability) {
String stageName = String.format("sample(%.2f)", probability);
return input.parallelDo(stageName, new SamplerFn<S>(seed, probability), input.getPType());
}
+
+ /**
+ * A {@code PTable<K, V>} analogue of the {@code sample} function.
+ */
+ public static <K, V> PTable<K, V> sample(PTable<K, V> input, double probability) {
+ return PTables.asPTable(sample((PCollection<Pair<K, V>>) input, probability));
+ }
+
+ /**
+ * A {@code PTable<K, V>} analogue of the {@code sample} function.
+ */
+ public static <K, V> PTable<K, V> sample(PTable<K, V> input, long seed, double probability) {
+ return PTables.asPTable(sample((PCollection<Pair<K, V>>) input, seed, probability));
+ }
}