You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/20 22:16:21 UTC

[28/50] [abbrv] incubator-beam git commit: [BEAM-342] Implement Filter#greaterThan, etc with Filter#byPredicate

[BEAM-342] Implement Filter#greaterThan,etc with Filter#byPredicate


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3d87f8b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3d87f8b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3d87f8b9

Branch: refs/heads/python-sdk
Commit: 3d87f8b987e243c6b3d99ab67142301af7b65743
Parents: 6491100
Author: manuzhang <ow...@gmail.com>
Authored: Wed Jun 15 16:02:35 2016 +0800
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Jun 20 15:14:30 2016 -0700

----------------------------------------------------------------------
 .../beam/examples/complete/AutoComplete.java    |   2 +-
 .../examples/complete/AutoCompleteTest.java     |  14 +-
 .../beam/examples/MinimalWordCountJava8.java    |   2 +-
 .../examples/complete/game/HourlyTeamScore.java |   6 +-
 .../examples/MinimalWordCountJava8Test.java     |   2 +-
 .../complete/game/HourlyTeamScoreTest.java      |   2 +-
 .../flink/examples/streaming/AutoComplete.java  |  12 +-
 .../org/apache/beam/sdk/transforms/Filter.java  | 128 +++++++------------
 .../apache/beam/sdk/transforms/FilterTest.java  |  63 +++------
 .../beam/sdk/transforms/FilterJava8Test.java    |   8 +-
 10 files changed, 89 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index d725e0a..3e4440c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -235,7 +235,7 @@ public class AutoComplete {
             .of(larger.get(1).apply(ParDo.of(new FlattenTops())))
             // ...together with those (previously excluded) candidates of length
             // exactly minPrefix...
-            .and(input.apply(Filter.byPredicate(
+            .and(input.apply(Filter.by(
                 new SerializableFunction<CompletionCandidate, Boolean>() {
                   @Override
                   public Boolean apply(CompletionCandidate c) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
index 93dd0be..b2ed9a2 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
@@ -85,13 +85,13 @@ public class AutoCompleteTest implements Serializable {
 
     PCollection<KV<String, List<CompletionCandidate>>> output =
       input.apply(new ComputeTopCompletions(2, recursive))
-           .apply(Filter.byPredicate(
-                        new SerializableFunction<KV<String, List<CompletionCandidate>>, Boolean>() {
-                          @Override
-                          public Boolean apply(KV<String, List<CompletionCandidate>> element) {
-                            return element.getKey().length() <= 2;
-                          }
-                      }));
+           .apply(Filter.by(
+               new SerializableFunction<KV<String, List<CompletionCandidate>>, Boolean>() {
+                 @Override
+                 public Boolean apply(KV<String, List<CompletionCandidate>> element) {
+                   return element.getKey().length() <= 2;
+                 }
+               }));
 
     PAssert.that(output).containsInAnyOrder(
         KV.of("a", parseList("apple:2", "apricot:1")),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
index d491741..0ad1a04 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
@@ -55,7 +55,7 @@ public class MinimalWordCountJava8 {
     p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
      .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
          .withOutputType(TypeDescriptors.strings()))
-     .apply(Filter.byPredicate((String word) -> !word.isEmpty()))
+     .apply(Filter.by((String word) -> !word.isEmpty()))
      .apply(Count.<String>perElement())
      .apply(MapElements
          .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
index 845c56f..ba3983d 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
@@ -44,7 +44,7 @@ import java.util.TimeZone;
 /**
  * This class is the second in a series of four pipelines that tell a story in a 'gaming'
  * domain, following {@link UserScore}. In addition to the concepts introduced in {@link UserScore},
- * new concepts include: windowing and element timestamps; use of {@code Filter.byPredicate()}.
+ * new concepts include: windowing and element timestamps; use of {@code Filter.by()}.
  *
  * <p> This pipeline processes data collected from gaming events in batch, building on {@link
  * UserScore} but using fixed windows. It calculates the sum of scores per team, for each window,
@@ -164,10 +164,10 @@ public class HourlyTeamScore extends UserScore {
       // (to scoop up late-arriving events from the day we're analyzing), we need to weed out events
       // that fall after the time period we want to analyze.
       // [START DocInclude_HTSFilters]
-      .apply("FilterStartTime", Filter.byPredicate(
+      .apply("FilterStartTime", Filter.by(
           (GameActionInfo gInfo)
               -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
-      .apply("FilterEndTime", Filter.byPredicate(
+      .apply("FilterEndTime", Filter.by(
           (GameActionInfo gInfo)
               -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))
       // [END DocInclude_HTSFilters]

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
index f73250f..4dfa474 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
@@ -66,7 +66,7 @@ public class MinimalWordCountJava8Test implements Serializable {
     p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
      .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
          .withOutputType(TypeDescriptors.strings()))
-     .apply(Filter.byPredicate((String word) -> !word.isEmpty()))
+     .apply(Filter.by((String word) -> !word.isEmpty()))
      .apply(Count.<String>perElement())
      .apply(MapElements
          .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
index 5ff615a..4254902 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
@@ -96,7 +96,7 @@ public class HourlyTeamScoreTest implements Serializable {
     PCollection<KV<String, Integer>> output = input
       .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
 
-      .apply("FilterStartTime", Filter.byPredicate(
+      .apply("FilterStartTime", Filter.by(
           (GameActionInfo gInfo)
               -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
       // run a map to access the fields in the result.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index 9d1168b..d83e662 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -215,13 +215,13 @@ public class AutoComplete {
             // ...together with those (previously excluded) candidates of length
             // exactly minPrefix...
             .and(input.apply(Filter.by(new SerializableFunction<CompletionCandidate, Boolean>() {
-                    private static final long serialVersionUID = 0;
+              private static final long serialVersionUID = 0;
 
-                    @Override
-                    public Boolean apply(CompletionCandidate c) {
-                      return c.getValue().length() == minPrefix;
-                    }
-                  })))
+              @Override
+              public Boolean apply(CompletionCandidate c) {
+                return c.getValue().length() == minPrefix;
+              }
+            })))
             .apply("FlattenSmall", Flatten.<CompletionCandidate>pCollections())
             // ...set the key to be the minPrefix-length prefix...
             .apply(ParDo.of(new AllPrefixes(minPrefix, minPrefix)))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
index 57796b8..a31799e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
@@ -41,7 +41,7 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
    * <pre> {@code
    * PCollection<String> wordList = ...;
    * PCollection<String> longWords =
-   *     wordList.apply(Filter.byPredicate(new MatchIfWordLengthGT(6)));
+   *     wordList.apply(Filter.by(new MatchIfWordLengthGT(6)));
    * } </pre>
    *
    * <p>See also {@link #lessThan}, {@link #lessThanEq},
@@ -50,25 +50,8 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
    * the elements' natural ordering.
    */
   public static <T, PredicateT extends SerializableFunction<T, Boolean>> Filter<T>
-  byPredicate(PredicateT predicate) {
-    return new Filter<T>("Filter", predicate);
-  }
-
-  /**
-   * @deprecated use {@link #byPredicate}, which returns a {@link Filter} transform instead of
-   * a {@link ParDo.Bound}.
-   */
-  @Deprecated
-  public static <T, PredicateT extends SerializableFunction<T, Boolean>> ParDo.Bound<T, T>
-  by(final PredicateT filterPred) {
-    return ParDo.named("Filter").of(new DoFn<T, T>() {
-      @Override
-      public void processElement(ProcessContext c) {
-        if (filterPred.apply(c.element()) == true) {
-          c.output(c.element());
-        }
-      }
-    });
+  by(PredicateT predicate) {
+    return new Filter<>(predicate);
   }
 
   /**
@@ -89,24 +72,16 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
    * inequalities with the specified value based on the elements'
    * natural ordering.
    *
-   * <p>See also {@link #byPredicate}, which returns elements
+   * <p>See also {@link #by}, which returns elements
    * that satisfy the given predicate.
    */
-  public static <T extends Comparable<T>> ParDo.Bound<T, T> lessThan(final T value) {
-    return ParDo.named("Filter.lessThan").of(new DoFn<T, T>() {
+  public static <T extends Comparable<T>> Filter<T> lessThan(final T value) {
+    return by(new SerializableFunction<T, Boolean>() {
       @Override
-      public void processElement(ProcessContext c) {
-        if (c.element().compareTo(value) < 0) {
-          c.output(c.element());
-        }
-      }
-
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-        Filter.populateDisplayData(builder, String.format("x < %s", value));
+      public Boolean apply(T input) {
+        return input.compareTo(value) < 0;
       }
-    });
+    }).described(String.format("x < %s", value));
   }
 
 
@@ -128,24 +103,16 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
    * inequalities with the specified value based on the elements'
    * natural ordering.
    *
-   * <p>See also {@link #byPredicate}, which returns elements
+   * <p>See also {@link #by}, which returns elements
    * that satisfy the given predicate.
    */
-  public static <T extends Comparable<T>> ParDo.Bound<T, T> greaterThan(final T value) {
-    return ParDo.named("Filter.greaterThan").of(new DoFn<T, T>() {
-      @Override
-      public void processElement(ProcessContext c) {
-        if (c.element().compareTo(value) > 0) {
-          c.output(c.element());
-        }
-      }
-
+  public static <T extends Comparable<T>> Filter<T> greaterThan(final T value) {
+    return by(new SerializableFunction<T, Boolean>() {
       @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-        Filter.populateDisplayData(builder, String.format("x > %s", value));
+      public Boolean apply(T input) {
+        return input.compareTo(value) > 0;
       }
-    });
+    }).described(String.format("x > %s", value));
   }
 
   /**
@@ -166,24 +133,16 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
    * inequalities with the specified value based on the elements'
    * natural ordering.
    *
-   * <p>See also {@link #byPredicate}, which returns elements
+   * <p>See also {@link #by}, which returns elements
    * that satisfy the given predicate.
    */
-  public static <T extends Comparable<T>> ParDo.Bound<T, T> lessThanEq(final T value) {
-    return ParDo.named("Filter.lessThanEq").of(new DoFn<T, T>() {
-      @Override
-      public void processElement(ProcessContext c) {
-        if (c.element().compareTo(value) <= 0) {
-          c.output(c.element());
-        }
-      }
-
+  public static <T extends Comparable<T>> Filter<T> lessThanEq(final T value) {
+    return by(new SerializableFunction<T, Boolean>() {
       @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-        Filter.populateDisplayData(builder, String.format("x \u2264 %s", value));
+      public Boolean apply(T input) {
+        return input.compareTo(value) <= 0;
       }
-    });
+    }).described(String.format("x \u2264 %s", value));
   }
 
   /**
@@ -204,46 +163,46 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
    * inequalities with the specified value based on the elements'
    * natural ordering.
    *
-   * <p>See also {@link #byPredicate}, which returns elements
+   * <p>See also {@link #by}, which returns elements
    * that satisfy the given predicate.
    */
-  public static <T extends Comparable<T>> ParDo.Bound<T, T> greaterThanEq(final T value) {
-    return ParDo.named("Filter.greaterThanEq").of(new DoFn<T, T>() {
+  public static <T extends Comparable<T>> Filter<T> greaterThanEq(final T value) {
+    return by(new SerializableFunction<T, Boolean>() {
       @Override
-      public void processElement(ProcessContext c) {
-        if (c.element().compareTo(value) >= 0) {
-          c.output(c.element());
-        }
+      public Boolean apply(T input) {
+        return input.compareTo(value) >= 0;
       }
-
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-        Filter.populateDisplayData(builder, String.format("x \u2265 %s", value));
-      }
-    });
+    }).described(String.format("x \u2265 %s", value));
   }
 
   ///////////////////////////////////////////////////////////////////////////////
 
   private SerializableFunction<T, Boolean> predicate;
+  private String predicateDescription;
 
   private Filter(SerializableFunction<T, Boolean> predicate) {
-    this.predicate = predicate;
+    this(predicate, "Filter.predicate");
   }
 
-  private Filter(String name, SerializableFunction<T, Boolean> predicate) {
-    super(name);
+  private Filter(SerializableFunction<T, Boolean> predicate,
+      String predicateDescription) {
     this.predicate = predicate;
+    this.predicateDescription = predicateDescription;
   }
 
-  public Filter<T> named(String name) {
-    return new Filter<>(name, predicate);
+  /**
+   * Returns a new {@link Filter} {@link PTransform} that's like this
+   * {@link PTransform} but with the specified description for {@link DisplayData}. Does not
+   * modify this {@link PTransform}.
+   */
+  Filter<T> described(String description) {
+    return new Filter<>(predicate, description);
+
   }
 
   @Override
   public PCollection<T> apply(PCollection<T> input) {
-    PCollection<T> output = input.apply(ParDo.named("Filter").of(new DoFn<T, T>() {
+    PCollection<T> output = input.apply(ParDo.of(new DoFn<T, T>() {
       @Override
       public void processElement(ProcessContext c) {
         if (predicate.apply(c.element()) == true) {
@@ -259,8 +218,9 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
     return input.getCoder();
   }
 
-  private static void populateDisplayData(
-      DisplayData.Builder builder, String predicateDescription) {
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
     builder.add(DisplayData.item("predicate", predicateDescription)
       .withLabel("Filter Predicate"));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
index 367bbc0..2edab05 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
@@ -21,7 +21,6 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
 
 import static org.hamcrest.MatcherAssert.assertThat;
 
-import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -61,10 +60,9 @@ public class FilterTest implements Serializable {
     }
   }
 
-  @Deprecated
   @Test
   @Category(RunnableOnService.class)
-  public void testIdentityFilterBy() {
+  public void testIdentityFilterByPredicate() {
     TestPipeline p = TestPipeline.create();
 
     PCollection<Integer> output = p
@@ -75,10 +73,9 @@ public class FilterTest implements Serializable {
     p.run();
   }
 
-  @Deprecated
   @Test
-  @Category(NeedsRunner.class)
-  public void testNoFilter() {
+  @Category(RunnableOnService.class)
+  public void testNoFilterByPredicate() {
     TestPipeline p = TestPipeline.create();
 
     PCollection<Integer> output = p
@@ -89,10 +86,9 @@ public class FilterTest implements Serializable {
     p.run();
   }
 
-  @Deprecated
   @Test
   @Category(RunnableOnService.class)
-  public void testFilterBy() {
+  public void testFilterByPredicate() {
     TestPipeline p = TestPipeline.create();
 
     PCollection<Integer> output = p
@@ -105,81 +101,64 @@ public class FilterTest implements Serializable {
 
   @Test
   @Category(RunnableOnService.class)
-  public void testIdentityFilterByPredicate() {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<Integer> output = p
-        .apply(Create.of(591, 11789, 1257, 24578, 24799, 307))
-        .apply(Filter.byPredicate(new TrivialFn(true)));
-
-    PAssert.that(output).containsInAnyOrder(591, 11789, 1257, 24578, 24799, 307);
-    p.run();
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testNoFilterByPredicate() {
+  public void testFilterLessThan() {
     TestPipeline p = TestPipeline.create();
 
     PCollection<Integer> output = p
-        .apply(Create.of(1, 2, 4, 5))
-        .apply(Filter.byPredicate(new TrivialFn(false)));
+        .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
+        .apply(Filter.lessThan(4));
 
-    PAssert.that(output).empty();
+    PAssert.that(output).containsInAnyOrder(1, 2, 3);
     p.run();
   }
 
   @Test
   @Category(RunnableOnService.class)
-  public void testFilterByPredicate() {
+  public void testFilterGreaterThan() {
     TestPipeline p = TestPipeline.create();
 
     PCollection<Integer> output = p
         .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
-        .apply(Filter.byPredicate(new EvenFn()));
+        .apply(Filter.greaterThan(4));
 
-    PAssert.that(output).containsInAnyOrder(2, 4, 6);
+    PAssert.that(output).containsInAnyOrder(5, 6, 7);
     p.run();
   }
 
   @Test
   @Category(RunnableOnService.class)
-  public void testFilterLessThan() {
+  public void testFilterLessThanEq() {
     TestPipeline p = TestPipeline.create();
 
     PCollection<Integer> output = p
         .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
-        .apply(Filter.lessThan(4));
+        .apply(Filter.lessThanEq(4));
 
-    PAssert.that(output).containsInAnyOrder(1, 2, 3);
+    PAssert.that(output).containsInAnyOrder(1, 2, 3, 4);
     p.run();
   }
 
   @Test
   @Category(RunnableOnService.class)
-  public void testFilterGreaterThan() {
+  public void testFilterGreaterThanEq() {
     TestPipeline p = TestPipeline.create();
 
     PCollection<Integer> output = p
         .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
-        .apply(Filter.greaterThan(4));
+        .apply(Filter.greaterThanEq(4));
 
-    PAssert.that(output).containsInAnyOrder(5, 6, 7);
+    PAssert.that(output).containsInAnyOrder(4, 5, 6, 7);
     p.run();
   }
 
   @Test
   public void testDisplayData() {
-    ParDo.Bound<Integer, Integer> lessThan = Filter.lessThan(123);
-    assertThat(DisplayData.from(lessThan), hasDisplayItem("predicate", "x < 123"));
+    assertThat(DisplayData.from(Filter.lessThan(123)), hasDisplayItem("predicate", "x < 123"));
 
-    ParDo.Bound<Integer, Integer> lessThanOrEqual = Filter.lessThanEq(234);
-    assertThat(DisplayData.from(lessThanOrEqual), hasDisplayItem("predicate", "x \u2264 234"));
+    assertThat(DisplayData.from(Filter.lessThanEq(234)), hasDisplayItem("predicate", "x \u2264 234"));
 
-    ParDo.Bound<Integer, Integer> greaterThan = Filter.greaterThan(345);
-    assertThat(DisplayData.from(greaterThan), hasDisplayItem("predicate", "x > 345"));
+    assertThat(DisplayData.from(Filter.greaterThan(345)), hasDisplayItem("predicate", "x > 345"));
 
-    ParDo.Bound<Integer, Integer> greaterThanOrEqual = Filter.greaterThanEq(456);
-    assertThat(DisplayData.from(greaterThanOrEqual), hasDisplayItem("predicate", "x \u2265 456"));
+    assertThat(DisplayData.from(Filter.greaterThanEq(456)), hasDisplayItem("predicate", "x \u2265 456"));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
index 170071b..3c83be2 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
@@ -50,7 +50,7 @@ public class FilterJava8Test implements Serializable {
 
     PCollection<Integer> output = pipeline
         .apply(Create.of(591, 11789, 1257, 24578, 24799, 307))
-        .apply(Filter.byPredicate(i -> true));
+        .apply(Filter.by(i -> true));
 
     PAssert.that(output).containsInAnyOrder(591, 11789, 1257, 24578, 24799, 307);
     pipeline.run();
@@ -62,7 +62,7 @@ public class FilterJava8Test implements Serializable {
 
     PCollection<Integer> output = pipeline
         .apply(Create.of(1, 2, 4, 5))
-        .apply(Filter.byPredicate(i -> false));
+        .apply(Filter.by(i -> false));
 
     PAssert.that(output).empty();
     pipeline.run();
@@ -75,7 +75,7 @@ public class FilterJava8Test implements Serializable {
 
     PCollection<Integer> output = pipeline
         .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
-        .apply(Filter.byPredicate(i -> i % 2 == 0));
+        .apply(Filter.by(i -> i % 2 == 0));
 
     PAssert.that(output).containsInAnyOrder(2, 4, 6);
     pipeline.run();
@@ -105,7 +105,7 @@ public class FilterJava8Test implements Serializable {
 
     PCollection<Integer> output = pipeline
         .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
-        .apply(Filter.byPredicate(new EvenFilter()::isEven));
+        .apply(Filter.by(new EvenFilter()::isEven));
 
     PAssert.that(output).containsInAnyOrder(2, 4, 6);
     pipeline.run();