You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/20 22:16:21 UTC
[28/50] [abbrv] incubator-beam git commit: [BEAM-342] Implement
Filter#greaterThan, etc with Filter#byPredicate
[BEAM-342] Implement Filter#greaterThan,etc with Filter#byPredicate
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3d87f8b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3d87f8b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3d87f8b9
Branch: refs/heads/python-sdk
Commit: 3d87f8b987e243c6b3d99ab67142301af7b65743
Parents: 6491100
Author: manuzhang <ow...@gmail.com>
Authored: Wed Jun 15 16:02:35 2016 +0800
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Jun 20 15:14:30 2016 -0700
----------------------------------------------------------------------
.../beam/examples/complete/AutoComplete.java | 2 +-
.../examples/complete/AutoCompleteTest.java | 14 +-
.../beam/examples/MinimalWordCountJava8.java | 2 +-
.../examples/complete/game/HourlyTeamScore.java | 6 +-
.../examples/MinimalWordCountJava8Test.java | 2 +-
.../complete/game/HourlyTeamScoreTest.java | 2 +-
.../flink/examples/streaming/AutoComplete.java | 12 +-
.../org/apache/beam/sdk/transforms/Filter.java | 128 +++++++------------
.../apache/beam/sdk/transforms/FilterTest.java | 63 +++------
.../beam/sdk/transforms/FilterJava8Test.java | 8 +-
10 files changed, 89 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index d725e0a..3e4440c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -235,7 +235,7 @@ public class AutoComplete {
.of(larger.get(1).apply(ParDo.of(new FlattenTops())))
// ...together with those (previously excluded) candidates of length
// exactly minPrefix...
- .and(input.apply(Filter.byPredicate(
+ .and(input.apply(Filter.by(
new SerializableFunction<CompletionCandidate, Boolean>() {
@Override
public Boolean apply(CompletionCandidate c) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
index 93dd0be..b2ed9a2 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
@@ -85,13 +85,13 @@ public class AutoCompleteTest implements Serializable {
PCollection<KV<String, List<CompletionCandidate>>> output =
input.apply(new ComputeTopCompletions(2, recursive))
- .apply(Filter.byPredicate(
- new SerializableFunction<KV<String, List<CompletionCandidate>>, Boolean>() {
- @Override
- public Boolean apply(KV<String, List<CompletionCandidate>> element) {
- return element.getKey().length() <= 2;
- }
- }));
+ .apply(Filter.by(
+ new SerializableFunction<KV<String, List<CompletionCandidate>>, Boolean>() {
+ @Override
+ public Boolean apply(KV<String, List<CompletionCandidate>> element) {
+ return element.getKey().length() <= 2;
+ }
+ }));
PAssert.that(output).containsInAnyOrder(
KV.of("a", parseList("apple:2", "apricot:1")),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
index d491741..0ad1a04 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
@@ -55,7 +55,7 @@ public class MinimalWordCountJava8 {
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
.apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
.withOutputType(TypeDescriptors.strings()))
- .apply(Filter.byPredicate((String word) -> !word.isEmpty()))
+ .apply(Filter.by((String word) -> !word.isEmpty()))
.apply(Count.<String>perElement())
.apply(MapElements
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
index 845c56f..ba3983d 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
@@ -44,7 +44,7 @@ import java.util.TimeZone;
/**
* This class is the second in a series of four pipelines that tell a story in a 'gaming'
* domain, following {@link UserScore}. In addition to the concepts introduced in {@link UserScore},
- * new concepts include: windowing and element timestamps; use of {@code Filter.byPredicate()}.
+ * new concepts include: windowing and element timestamps; use of {@code Filter.by()}.
*
* <p> This pipeline processes data collected from gaming events in batch, building on {@link
* UserScore} but using fixed windows. It calculates the sum of scores per team, for each window,
@@ -164,10 +164,10 @@ public class HourlyTeamScore extends UserScore {
// (to scoop up late-arriving events from the day we're analyzing), we need to weed out events
// that fall after the time period we want to analyze.
// [START DocInclude_HTSFilters]
- .apply("FilterStartTime", Filter.byPredicate(
+ .apply("FilterStartTime", Filter.by(
(GameActionInfo gInfo)
-> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
- .apply("FilterEndTime", Filter.byPredicate(
+ .apply("FilterEndTime", Filter.by(
(GameActionInfo gInfo)
-> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))
// [END DocInclude_HTSFilters]
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
index f73250f..4dfa474 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
@@ -66,7 +66,7 @@ public class MinimalWordCountJava8Test implements Serializable {
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
.apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
.withOutputType(TypeDescriptors.strings()))
- .apply(Filter.byPredicate((String word) -> !word.isEmpty()))
+ .apply(Filter.by((String word) -> !word.isEmpty()))
.apply(Count.<String>perElement())
.apply(MapElements
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
index 5ff615a..4254902 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
@@ -96,7 +96,7 @@ public class HourlyTeamScoreTest implements Serializable {
PCollection<KV<String, Integer>> output = input
.apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
- .apply("FilterStartTime", Filter.byPredicate(
+ .apply("FilterStartTime", Filter.by(
(GameActionInfo gInfo)
-> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
// run a map to access the fields in the result.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index 9d1168b..d83e662 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -215,13 +215,13 @@ public class AutoComplete {
// ...together with those (previously excluded) candidates of length
// exactly minPrefix...
.and(input.apply(Filter.by(new SerializableFunction<CompletionCandidate, Boolean>() {
- private static final long serialVersionUID = 0;
+ private static final long serialVersionUID = 0;
- @Override
- public Boolean apply(CompletionCandidate c) {
- return c.getValue().length() == minPrefix;
- }
- })))
+ @Override
+ public Boolean apply(CompletionCandidate c) {
+ return c.getValue().length() == minPrefix;
+ }
+ })))
.apply("FlattenSmall", Flatten.<CompletionCandidate>pCollections())
// ...set the key to be the minPrefix-length prefix...
.apply(ParDo.of(new AllPrefixes(minPrefix, minPrefix)))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
index 57796b8..a31799e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
@@ -41,7 +41,7 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
* <pre> {@code
* PCollection<String> wordList = ...;
* PCollection<String> longWords =
- * wordList.apply(Filter.byPredicate(new MatchIfWordLengthGT(6)));
+ * wordList.apply(Filter.by(new MatchIfWordLengthGT(6)));
* } </pre>
*
* <p>See also {@link #lessThan}, {@link #lessThanEq},
@@ -50,25 +50,8 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
* the elements' natural ordering.
*/
public static <T, PredicateT extends SerializableFunction<T, Boolean>> Filter<T>
- byPredicate(PredicateT predicate) {
- return new Filter<T>("Filter", predicate);
- }
-
- /**
- * @deprecated use {@link #byPredicate}, which returns a {@link Filter} transform instead of
- * a {@link ParDo.Bound}.
- */
- @Deprecated
- public static <T, PredicateT extends SerializableFunction<T, Boolean>> ParDo.Bound<T, T>
- by(final PredicateT filterPred) {
- return ParDo.named("Filter").of(new DoFn<T, T>() {
- @Override
- public void processElement(ProcessContext c) {
- if (filterPred.apply(c.element()) == true) {
- c.output(c.element());
- }
- }
- });
+ by(PredicateT predicate) {
+ return new Filter<>(predicate);
}
/**
@@ -89,24 +72,16 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
* inequalities with the specified value based on the elements'
* natural ordering.
*
- * <p>See also {@link #byPredicate}, which returns elements
+ * <p>See also {@link #by}, which returns elements
* that satisfy the given predicate.
*/
- public static <T extends Comparable<T>> ParDo.Bound<T, T> lessThan(final T value) {
- return ParDo.named("Filter.lessThan").of(new DoFn<T, T>() {
+ public static <T extends Comparable<T>> Filter<T> lessThan(final T value) {
+ return by(new SerializableFunction<T, Boolean>() {
@Override
- public void processElement(ProcessContext c) {
- if (c.element().compareTo(value) < 0) {
- c.output(c.element());
- }
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- Filter.populateDisplayData(builder, String.format("x < %s", value));
+ public Boolean apply(T input) {
+ return input.compareTo(value) < 0;
}
- });
+ }).described(String.format("x < %s", value));
}
@@ -128,24 +103,16 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
* inequalities with the specified value based on the elements'
* natural ordering.
*
- * <p>See also {@link #byPredicate}, which returns elements
+ * <p>See also {@link #by}, which returns elements
* that satisfy the given predicate.
*/
- public static <T extends Comparable<T>> ParDo.Bound<T, T> greaterThan(final T value) {
- return ParDo.named("Filter.greaterThan").of(new DoFn<T, T>() {
- @Override
- public void processElement(ProcessContext c) {
- if (c.element().compareTo(value) > 0) {
- c.output(c.element());
- }
- }
-
+ public static <T extends Comparable<T>> Filter<T> greaterThan(final T value) {
+ return by(new SerializableFunction<T, Boolean>() {
@Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- Filter.populateDisplayData(builder, String.format("x > %s", value));
+ public Boolean apply(T input) {
+ return input.compareTo(value) > 0;
}
- });
+ }).described(String.format("x > %s", value));
}
/**
@@ -166,24 +133,16 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
* inequalities with the specified value based on the elements'
* natural ordering.
*
- * <p>See also {@link #byPredicate}, which returns elements
+ * <p>See also {@link #by}, which returns elements
* that satisfy the given predicate.
*/
- public static <T extends Comparable<T>> ParDo.Bound<T, T> lessThanEq(final T value) {
- return ParDo.named("Filter.lessThanEq").of(new DoFn<T, T>() {
- @Override
- public void processElement(ProcessContext c) {
- if (c.element().compareTo(value) <= 0) {
- c.output(c.element());
- }
- }
-
+ public static <T extends Comparable<T>> Filter<T> lessThanEq(final T value) {
+ return by(new SerializableFunction<T, Boolean>() {
@Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- Filter.populateDisplayData(builder, String.format("x \u2264 %s", value));
+ public Boolean apply(T input) {
+ return input.compareTo(value) <= 0;
}
- });
+ }).described(String.format("x \u2264 %s", value));
}
/**
@@ -204,46 +163,46 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
* inequalities with the specified value based on the elements'
* natural ordering.
*
- * <p>See also {@link #byPredicate}, which returns elements
+ * <p>See also {@link #by}, which returns elements
* that satisfy the given predicate.
*/
- public static <T extends Comparable<T>> ParDo.Bound<T, T> greaterThanEq(final T value) {
- return ParDo.named("Filter.greaterThanEq").of(new DoFn<T, T>() {
+ public static <T extends Comparable<T>> Filter<T> greaterThanEq(final T value) {
+ return by(new SerializableFunction<T, Boolean>() {
@Override
- public void processElement(ProcessContext c) {
- if (c.element().compareTo(value) >= 0) {
- c.output(c.element());
- }
+ public Boolean apply(T input) {
+ return input.compareTo(value) >= 0;
}
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- Filter.populateDisplayData(builder, String.format("x \u2265 %s", value));
- }
- });
+ }).described(String.format("x \u2265 %s", value));
}
///////////////////////////////////////////////////////////////////////////////
private SerializableFunction<T, Boolean> predicate;
+ private String predicateDescription;
private Filter(SerializableFunction<T, Boolean> predicate) {
- this.predicate = predicate;
+ this(predicate, "Filter.predicate");
}
- private Filter(String name, SerializableFunction<T, Boolean> predicate) {
- super(name);
+ private Filter(SerializableFunction<T, Boolean> predicate,
+ String predicateDescription) {
this.predicate = predicate;
+ this.predicateDescription = predicateDescription;
}
- public Filter<T> named(String name) {
- return new Filter<>(name, predicate);
+ /**
+ * Returns a new {@link Filter} {@link PTransform} that's like this
+ * {@link PTransform} but with the specified description for {@link DisplayData}. Does not
+ * modify this {@link PTransform}.
+ */
+ Filter<T> described(String description) {
+ return new Filter<>(predicate, description);
+
}
@Override
public PCollection<T> apply(PCollection<T> input) {
- PCollection<T> output = input.apply(ParDo.named("Filter").of(new DoFn<T, T>() {
+ PCollection<T> output = input.apply(ParDo.of(new DoFn<T, T>() {
@Override
public void processElement(ProcessContext c) {
if (predicate.apply(c.element()) == true) {
@@ -259,8 +218,9 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
return input.getCoder();
}
- private static void populateDisplayData(
- DisplayData.Builder builder, String predicateDescription) {
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
builder.add(DisplayData.item("predicate", predicateDescription)
.withLabel("Filter Predicate"));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
index 367bbc0..2edab05 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
@@ -21,7 +21,6 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
import static org.hamcrest.MatcherAssert.assertThat;
-import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -61,10 +60,9 @@ public class FilterTest implements Serializable {
}
}
- @Deprecated
@Test
@Category(RunnableOnService.class)
- public void testIdentityFilterBy() {
+ public void testIdentityFilterByPredicate() {
TestPipeline p = TestPipeline.create();
PCollection<Integer> output = p
@@ -75,10 +73,9 @@ public class FilterTest implements Serializable {
p.run();
}
- @Deprecated
@Test
- @Category(NeedsRunner.class)
- public void testNoFilter() {
+ @Category(RunnableOnService.class)
+ public void testNoFilterByPredicate() {
TestPipeline p = TestPipeline.create();
PCollection<Integer> output = p
@@ -89,10 +86,9 @@ public class FilterTest implements Serializable {
p.run();
}
- @Deprecated
@Test
@Category(RunnableOnService.class)
- public void testFilterBy() {
+ public void testFilterByPredicate() {
TestPipeline p = TestPipeline.create();
PCollection<Integer> output = p
@@ -105,81 +101,64 @@ public class FilterTest implements Serializable {
@Test
@Category(RunnableOnService.class)
- public void testIdentityFilterByPredicate() {
- TestPipeline p = TestPipeline.create();
-
- PCollection<Integer> output = p
- .apply(Create.of(591, 11789, 1257, 24578, 24799, 307))
- .apply(Filter.byPredicate(new TrivialFn(true)));
-
- PAssert.that(output).containsInAnyOrder(591, 11789, 1257, 24578, 24799, 307);
- p.run();
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testNoFilterByPredicate() {
+ public void testFilterLessThan() {
TestPipeline p = TestPipeline.create();
PCollection<Integer> output = p
- .apply(Create.of(1, 2, 4, 5))
- .apply(Filter.byPredicate(new TrivialFn(false)));
+ .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
+ .apply(Filter.lessThan(4));
- PAssert.that(output).empty();
+ PAssert.that(output).containsInAnyOrder(1, 2, 3);
p.run();
}
@Test
@Category(RunnableOnService.class)
- public void testFilterByPredicate() {
+ public void testFilterGreaterThan() {
TestPipeline p = TestPipeline.create();
PCollection<Integer> output = p
.apply(Create.of(1, 2, 3, 4, 5, 6, 7))
- .apply(Filter.byPredicate(new EvenFn()));
+ .apply(Filter.greaterThan(4));
- PAssert.that(output).containsInAnyOrder(2, 4, 6);
+ PAssert.that(output).containsInAnyOrder(5, 6, 7);
p.run();
}
@Test
@Category(RunnableOnService.class)
- public void testFilterLessThan() {
+ public void testFilterLessThanEq() {
TestPipeline p = TestPipeline.create();
PCollection<Integer> output = p
.apply(Create.of(1, 2, 3, 4, 5, 6, 7))
- .apply(Filter.lessThan(4));
+ .apply(Filter.lessThanEq(4));
- PAssert.that(output).containsInAnyOrder(1, 2, 3);
+ PAssert.that(output).containsInAnyOrder(1, 2, 3, 4);
p.run();
}
@Test
@Category(RunnableOnService.class)
- public void testFilterGreaterThan() {
+ public void testFilterGreaterThanEq() {
TestPipeline p = TestPipeline.create();
PCollection<Integer> output = p
.apply(Create.of(1, 2, 3, 4, 5, 6, 7))
- .apply(Filter.greaterThan(4));
+ .apply(Filter.greaterThanEq(4));
- PAssert.that(output).containsInAnyOrder(5, 6, 7);
+ PAssert.that(output).containsInAnyOrder(4, 5, 6, 7);
p.run();
}
@Test
public void testDisplayData() {
- ParDo.Bound<Integer, Integer> lessThan = Filter.lessThan(123);
- assertThat(DisplayData.from(lessThan), hasDisplayItem("predicate", "x < 123"));
+ assertThat(DisplayData.from(Filter.lessThan(123)), hasDisplayItem("predicate", "x < 123"));
- ParDo.Bound<Integer, Integer> lessThanOrEqual = Filter.lessThanEq(234);
- assertThat(DisplayData.from(lessThanOrEqual), hasDisplayItem("predicate", "x \u2264 234"));
+ assertThat(DisplayData.from(Filter.lessThanEq(234)), hasDisplayItem("predicate", "x \u2264 234"));
- ParDo.Bound<Integer, Integer> greaterThan = Filter.greaterThan(345);
- assertThat(DisplayData.from(greaterThan), hasDisplayItem("predicate", "x > 345"));
+ assertThat(DisplayData.from(Filter.greaterThan(345)), hasDisplayItem("predicate", "x > 345"));
- ParDo.Bound<Integer, Integer> greaterThanOrEqual = Filter.greaterThanEq(456);
- assertThat(DisplayData.from(greaterThanOrEqual), hasDisplayItem("predicate", "x \u2265 456"));
+ assertThat(DisplayData.from(Filter.greaterThanEq(456)), hasDisplayItem("predicate", "x \u2265 456"));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
index 170071b..3c83be2 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
@@ -50,7 +50,7 @@ public class FilterJava8Test implements Serializable {
PCollection<Integer> output = pipeline
.apply(Create.of(591, 11789, 1257, 24578, 24799, 307))
- .apply(Filter.byPredicate(i -> true));
+ .apply(Filter.by(i -> true));
PAssert.that(output).containsInAnyOrder(591, 11789, 1257, 24578, 24799, 307);
pipeline.run();
@@ -62,7 +62,7 @@ public class FilterJava8Test implements Serializable {
PCollection<Integer> output = pipeline
.apply(Create.of(1, 2, 4, 5))
- .apply(Filter.byPredicate(i -> false));
+ .apply(Filter.by(i -> false));
PAssert.that(output).empty();
pipeline.run();
@@ -75,7 +75,7 @@ public class FilterJava8Test implements Serializable {
PCollection<Integer> output = pipeline
.apply(Create.of(1, 2, 3, 4, 5, 6, 7))
- .apply(Filter.byPredicate(i -> i % 2 == 0));
+ .apply(Filter.by(i -> i % 2 == 0));
PAssert.that(output).containsInAnyOrder(2, 4, 6);
pipeline.run();
@@ -105,7 +105,7 @@ public class FilterJava8Test implements Serializable {
PCollection<Integer> output = pipeline
.apply(Create.of(1, 2, 3, 4, 5, 6, 7))
- .apply(Filter.byPredicate(new EvenFilter()::isEven));
+ .apply(Filter.by(new EvenFilter()::isEven));
PAssert.that(output).containsInAnyOrder(2, 4, 6);
pipeline.run();