You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2013/01/14 16:59:39 UTC
git commit: CRUNCH-142 Call delegate setContext and configure
Updated Branches:
refs/heads/master 018d16984 -> e8b9d4b2a
CRUNCH-142 Call delegate setContext and configure
Call the delegate setContext and configure methods from FilterFn
decorators (And, Or, Not).
Contributed by Dave Beech.
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/e8b9d4b2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/e8b9d4b2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/e8b9d4b2
Branch: refs/heads/master
Commit: e8b9d4b2aa651503aa03a5a75f13aa6e6f9347f7
Parents: 018d169
Author: Gabriel Reid <gr...@apache.org>
Authored: Mon Jan 14 16:20:54 2013 +0100
Committer: Gabriel Reid <gr...@apache.org>
Committed: Mon Jan 14 16:47:15 2013 +0100
----------------------------------------------------------------------
.../src/main/java/org/apache/crunch/FilterFn.java | 43 +++++++++++++++
1 files changed, 43 insertions(+), 0 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/e8b9d4b2/crunch/src/main/java/org/apache/crunch/FilterFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/FilterFn.java b/crunch/src/main/java/org/apache/crunch/FilterFn.java
index 467400f..010afed 100644
--- a/crunch/src/main/java/org/apache/crunch/FilterFn.java
+++ b/crunch/src/main/java/org/apache/crunch/FilterFn.java
@@ -20,6 +20,8 @@ package org.apache.crunch;
import java.util.List;
import org.apache.crunch.fn.FilterFns;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import com.google.common.collect.ImmutableList;
@@ -63,6 +65,21 @@ public abstract class FilterFn<T> extends DoFn<T, T> {
public AndFn(FilterFn<S>... fns) {
this.fns = ImmutableList.<FilterFn<S>> copyOf(fns);
}
+
+ @Override
+ public void configure(Configuration conf) {
+ for (FilterFn<S> fn : fns) {
+ fn.configure(conf);
+ }
+ }
+
+ @Override
+ public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+ for (FilterFn<S> fn : fns) {
+ fn.setContext(context);
+ }
+ initialize();
+ }
@Override
public boolean accept(S input) {
@@ -101,6 +118,21 @@ public abstract class FilterFn<T> extends DoFn<T, T> {
public OrFn(FilterFn<S>... fns) {
this.fns = ImmutableList.<FilterFn<S>> copyOf(fns);
}
+
+ @Override
+ public void configure(Configuration conf) {
+ for (FilterFn<S> fn : fns) {
+ fn.configure(conf);
+ }
+ }
+
+ @Override
+ public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+ for (FilterFn<S> fn : fns) {
+ fn.setContext(context);
+ }
+ initialize();
+ }
@Override
public boolean accept(S input) {
@@ -139,8 +171,19 @@ public abstract class FilterFn<T> extends DoFn<T, T> {
public NotFn(FilterFn<S> base) {
this.base = base;
}
+
+ @Override
+ public void configure(Configuration conf) {
+ base.configure(conf);
+ }
@Override
+ public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+ base.setContext(context);
+ initialize();
+ }
+
+ @Override
public boolean accept(S input) {
return !base.accept(input);
}