You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/01/29 22:16:37 UTC

git commit: CRUNCH-150: Crunch lib function for converting PCollection> to a PTable.

Updated Branches:
  refs/heads/master 9a1c42760 -> 035b1b91d


CRUNCH-150: Crunch lib function for converting PCollection<Pair<K, V>> to a PTable<K, V>.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/035b1b91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/035b1b91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/035b1b91

Branch: refs/heads/master
Commit: 035b1b91d60c1ed5029135d73706ffd54b184a8c
Parents: 9a1c427
Author: Josh Wills <jw...@apache.org>
Authored: Thu Jan 24 14:55:26 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Tue Jan 29 12:24:29 2013 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/crunch/lib/Distinct.java  |   15 +++++
 .../main/java/org/apache/crunch/lib/PTables.java   |   25 ++++++++
 .../main/java/org/apache/crunch/lib/Sample.java    |   47 ++++++++++++---
 3 files changed, 79 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/035b1b91/crunch/src/main/java/org/apache/crunch/lib/Distinct.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Distinct.java b/crunch/src/main/java/org/apache/crunch/lib/Distinct.java
index 15f7205..533f3fb 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Distinct.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Distinct.java
@@ -22,6 +22,7 @@ import java.util.Set;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
@@ -48,6 +49,13 @@ public final class Distinct {
   }
   
   /**
+   * A {@code PTable<K, V>} analogue of the {@code distinct} function.
+   */
+  public static <K, V> PTable<K, V> distinct(PTable<K, V> input) {
+    return PTables.asPTable(distinct((PCollection<Pair<K, V>>) input));
+  }
+  
+  /**
    * A {@code distinct} operation that gives the client more control over how frequently
    * elements are flushed to disk in order to allow control over performance or
    * memory consumption.
@@ -66,6 +74,13 @@ public final class Distinct {
         .parallelDo("post-distinct", new PostDistinctFn<S>(), pt);
   }
   
+  /**
+   * A {@code PTable<K, V>} analogue of the {@code distinct} function.
+   */
+  public static <K, V> PTable<K, V> distinct(PTable<K, V> input, int flushEvery) {
+    return PTables.asPTable(distinct((PCollection<Pair<K, V>>) input, flushEvery));
+  }
+  
   private static class PreDistinctFn<S> extends DoFn<S, Pair<S, Void>> {
     private final Set<S> values = Sets.newHashSet();
     private final int flushEvery;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/035b1b91/crunch/src/main/java/org/apache/crunch/lib/PTables.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/PTables.java b/crunch/src/main/java/org/apache/crunch/lib/PTables.java
index e788656..e907680 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/PTables.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/PTables.java
@@ -25,9 +25,11 @@ import org.apache.crunch.PCollection;
 import org.apache.crunch.PGroupedTable;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
+import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.types.PGroupedTableType;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
 
 import com.google.common.collect.Lists;
 
@@ -37,6 +39,24 @@ import com.google.common.collect.Lists;
  */
 public class PTables {
 
+  /**
+   * Convert the given {@code PCollection<Pair<K, V>>} to a {@code PTable<K, V>}.
+   * @param pcollect The {@code PCollection} to convert
+   * @return A {@code PTable} that contains the same data as the input {@code PCollection}
+   */
+  public static <K, V> PTable<K, V> asPTable(PCollection<Pair<K, V>> pcollect) {
+    PType<Pair<K, V>> pt = pcollect.getPType();
+    PTypeFamily ptf = pt.getFamily();
+    PTableType<K, V> ptt = ptf.tableOf(pt.getSubTypes().get(0), pt.getSubTypes().get(1));
+    DoFn<Pair<K, V>, Pair<K, V>> id = IdentityFn.getInstance();
+    return pcollect.parallelDo("asPTable", id, ptt);
+  }
+  
+  /**
+   * Extract the keys from the given {@code PTable<K, V>} as a {@code PCollection<K>}.
+   * @param ptable The {@code PTable}
+   * @return A {@code PCollection<K>}
+   */
   public static <K, V> PCollection<K> keys(PTable<K, V> ptable) {
     return ptable.parallelDo("PTables.keys", new DoFn<Pair<K, V>, K>() {
       @Override
@@ -46,6 +66,11 @@ public class PTables {
     }, ptable.getKeyType());
   }
 
+  /**
+   * Extract the values from the given {@code PTable<K, V>} as a {@code PCollection<V>}.
+   * @param ptable The {@code PTable}
+   * @return A {@code PCollection<V>}
+   */
   public static <K, V> PCollection<V> values(PTable<K, V> ptable) {
     return ptable.parallelDo("PTables.values", new DoFn<Pair<K, V>, V>() {
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/035b1b91/crunch/src/main/java/org/apache/crunch/lib/Sample.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Sample.java b/crunch/src/main/java/org/apache/crunch/lib/Sample.java
index 54f8731..5be2292 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Sample.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Sample.java
@@ -19,15 +19,16 @@ package org.apache.crunch.lib;
 
 import java.util.Random;
 
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
+import org.apache.crunch.FilterFn;
 import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
 
 import com.google.common.base.Preconditions;
 
 public class Sample {
 
-  private static class SamplerFn<S> extends DoFn<S, S> {
+  private static class SamplerFn<S> extends FilterFn<S> {
 
     private final long seed;
     private final double acceptanceProbability;
@@ -41,23 +42,53 @@ public class Sample {
 
     @Override
     public void initialize() {
-      r = new Random(seed);
+      if (r == null) {
+        r = new Random(seed);
+      }
     }
 
     @Override
-    public void process(S input, Emitter<S> emitter) {
-      if (r.nextDouble() < acceptanceProbability) {
-        emitter.emit(input);
-      }
+    public boolean accept(S input) {
+      return r.nextDouble() < acceptanceProbability;
     }
   }
 
+  /**
+   * Output records from the given {@code PCollection} with the given probability.
+   * 
+   * @param input The {@code PCollection} to sample from
+   * @param probability The probability (0.0 < p < 1.0)
+   * @return The output {@code PCollection} created from sampling
+   */
   public static <S> PCollection<S> sample(PCollection<S> input, double probability) {
     return sample(input, System.currentTimeMillis(), probability);
   }
 
+  /**
+   * Output records from the given {@code PCollection} using a given seed. Useful for unit
+   * testing.
+   * 
+   * @param input The {@code PCollection} to sample from
+   * @param seed The seed
+   * @param probability The probability (0.0 < p < 1.0)
+   * @return The output {@code PCollection} created from sampling
+   */
   public static <S> PCollection<S> sample(PCollection<S> input, long seed, double probability) {
     String stageName = String.format("sample(%.2f)", probability);
     return input.parallelDo(stageName, new SamplerFn<S>(seed, probability), input.getPType());
   }
+  
+  /**
+   * A {@code PTable<K, V>} analogue of the {@code sample} function.
+   */
+  public static <K, V> PTable<K, V> sample(PTable<K, V> input, double probability) {
+    return PTables.asPTable(sample((PCollection<Pair<K, V>>) input, probability));
+  }
+  
+  /**
+   * A {@code PTable<K, V>} analogue of the {@code sample} function.
+   */
+  public static <K, V> PTable<K, V> sample(PTable<K, V> input, long seed, double probability) {
+    return PTables.asPTable(sample((PCollection<Pair<K, V>>) input, seed, probability));
+  }
 }