You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2013/01/14 16:59:39 UTC

git commit: CRUNCH-142 Call delegate setContext and configure

Updated Branches:
  refs/heads/master 018d16984 -> e8b9d4b2a


CRUNCH-142 Call delegate setContext and configure

Call the delegate setContext and configure methods from FilterFn
decorators (And, Or, Not).

Contributed by Dave Beech.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/e8b9d4b2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/e8b9d4b2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/e8b9d4b2

Branch: refs/heads/master
Commit: e8b9d4b2aa651503aa03a5a75f13aa6e6f9347f7
Parents: 018d169
Author: Gabriel Reid <gr...@apache.org>
Authored: Mon Jan 14 16:20:54 2013 +0100
Committer: Gabriel Reid <gr...@apache.org>
Committed: Mon Jan 14 16:47:15 2013 +0100

----------------------------------------------------------------------
 .../src/main/java/org/apache/crunch/FilterFn.java  |   43 +++++++++++++++
 1 files changed, 43 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/e8b9d4b2/crunch/src/main/java/org/apache/crunch/FilterFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/FilterFn.java b/crunch/src/main/java/org/apache/crunch/FilterFn.java
index 467400f..010afed 100644
--- a/crunch/src/main/java/org/apache/crunch/FilterFn.java
+++ b/crunch/src/main/java/org/apache/crunch/FilterFn.java
@@ -20,6 +20,8 @@ package org.apache.crunch;
 import java.util.List;
 
 import org.apache.crunch.fn.FilterFns;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
 import com.google.common.collect.ImmutableList;
 
@@ -63,6 +65,21 @@ public abstract class FilterFn<T> extends DoFn<T, T> {
     public AndFn(FilterFn<S>... fns) {
       this.fns = ImmutableList.<FilterFn<S>> copyOf(fns);
     }
+    
+    @Override
+    public void configure(Configuration conf) {
+      for (FilterFn<S> fn : fns) {
+        fn.configure(conf);
+      }
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      for (FilterFn<S> fn : fns) {
+        fn.setContext(context);
+      }
+      initialize();
+    }
 
     @Override
     public boolean accept(S input) {
@@ -101,6 +118,21 @@ public abstract class FilterFn<T> extends DoFn<T, T> {
     public OrFn(FilterFn<S>... fns) {
       this.fns = ImmutableList.<FilterFn<S>> copyOf(fns);
     }
+    
+    @Override
+    public void configure(Configuration conf) {
+      for (FilterFn<S> fn : fns) {
+        fn.configure(conf);
+      }
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      for (FilterFn<S> fn : fns) {
+        fn.setContext(context);
+      }
+      initialize();
+    }
 
     @Override
     public boolean accept(S input) {
@@ -139,8 +171,19 @@ public abstract class FilterFn<T> extends DoFn<T, T> {
     public NotFn(FilterFn<S> base) {
       this.base = base;
     }
+    
+    @Override
+    public void configure(Configuration conf) {
+        base.configure(conf);
+    }
 
     @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      base.setContext(context);
+      initialize();
+    }
+    
+    @Override
     public boolean accept(S input) {
       return !base.accept(input);
     }