You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2013/04/05 21:34:49 UTC

git commit: CRUNCH-191 Detached retained values in Distinct

Updated Branches:
  refs/heads/master 64497fa4f -> 3e513cfab


CRUNCH-191 Detached retained values in Distinct


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/3e513cfa
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/3e513cfa
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/3e513cfa

Branch: refs/heads/master
Commit: 3e513cfabf7d37321c868ea8007aa3c9d202e644
Parents: 64497fa
Author: Gabriel Reid <gr...@apache.org>
Authored: Fri Apr 5 21:32:31 2013 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Fri Apr 5 21:32:31 2013 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/crunch/lib/Distinct.java  |   14 +++++++++++---
 1 files changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/3e513cfa/crunch/src/main/java/org/apache/crunch/lib/Distinct.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Distinct.java b/crunch/src/main/java/org/apache/crunch/lib/Distinct.java
index 533f3fb..994830d 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Distinct.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Distinct.java
@@ -69,7 +69,7 @@ public final class Distinct {
     PType<S> pt = input.getPType();
     PTypeFamily ptf = pt.getFamily();
     return input
-        .parallelDo("pre-distinct", new PreDistinctFn<S>(flushEvery), ptf.tableOf(pt, ptf.nulls()))
+        .parallelDo("pre-distinct", new PreDistinctFn<S>(flushEvery, pt), ptf.tableOf(pt, ptf.nulls()))
         .groupByKey()
         .parallelDo("post-distinct", new PostDistinctFn<S>(), pt);
   }
@@ -84,14 +84,22 @@ public final class Distinct {
   private static class PreDistinctFn<S> extends DoFn<S, Pair<S, Void>> {
     private final Set<S> values = Sets.newHashSet();
     private final int flushEvery;
+    private final PType<S> ptype;
     
-    public PreDistinctFn(int flushEvery) {
+    public PreDistinctFn(int flushEvery, PType<S> ptype) {
       this.flushEvery = flushEvery;
+      this.ptype = ptype;
+    }
+    
+    @Override
+    public void initialize() {
+      super.initialize();
+      ptype.initialize(getConfiguration());
     }
     
     @Override
     public void process(S input, Emitter<Pair<S, Void>> emitter) {
-      values.add(input);
+      values.add(ptype.getDetachedValue(input));
       if (values.size() > flushEvery) {
         cleanup(emitter);
       }