You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2013/04/05 21:34:49 UTC
git commit: CRUNCH-191 Detached retained values in Distinct
Updated Branches:
refs/heads/master 64497fa4f -> 3e513cfab
CRUNCH-191 Detached retained values in Distinct
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/3e513cfa
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/3e513cfa
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/3e513cfa
Branch: refs/heads/master
Commit: 3e513cfabf7d37321c868ea8007aa3c9d202e644
Parents: 64497fa
Author: Gabriel Reid <gr...@apache.org>
Authored: Fri Apr 5 21:32:31 2013 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Fri Apr 5 21:32:31 2013 +0200
----------------------------------------------------------------------
.../main/java/org/apache/crunch/lib/Distinct.java | 14 +++++++++++---
1 files changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/3e513cfa/crunch/src/main/java/org/apache/crunch/lib/Distinct.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Distinct.java b/crunch/src/main/java/org/apache/crunch/lib/Distinct.java
index 533f3fb..994830d 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Distinct.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Distinct.java
@@ -69,7 +69,7 @@ public final class Distinct {
PType<S> pt = input.getPType();
PTypeFamily ptf = pt.getFamily();
return input
- .parallelDo("pre-distinct", new PreDistinctFn<S>(flushEvery), ptf.tableOf(pt, ptf.nulls()))
+ .parallelDo("pre-distinct", new PreDistinctFn<S>(flushEvery, pt), ptf.tableOf(pt, ptf.nulls()))
.groupByKey()
.parallelDo("post-distinct", new PostDistinctFn<S>(), pt);
}
@@ -84,14 +84,22 @@ public final class Distinct {
private static class PreDistinctFn<S> extends DoFn<S, Pair<S, Void>> {
private final Set<S> values = Sets.newHashSet();
private final int flushEvery;
+ private final PType<S> ptype;
- public PreDistinctFn(int flushEvery) {
+ public PreDistinctFn(int flushEvery, PType<S> ptype) {
this.flushEvery = flushEvery;
+ this.ptype = ptype;
+ }
+
+ @Override
+ public void initialize() {
+ super.initialize();
+ ptype.initialize(getConfiguration());
}
@Override
public void process(S input, Emitter<Pair<S, Void>> emitter) {
- values.add(input);
+ values.add(ptype.getDetachedValue(input));
if (values.size() > flushEvery) {
cleanup(emitter);
}