You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/18 20:41:12 UTC
[1/2] beam git commit: [BEAM-659] WindowFn#isCompatible should
provide a meaningful reason
Repository: beam
Updated Branches:
refs/heads/master 8ef812def -> 8c572ef0b
[BEAM-659] WindowFn#isCompatible should provide a meaningful reason
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a8d2125e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a8d2125e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a8d2125e
Branch: refs/heads/master
Commit: a8d2125e5783a556056e88dad8fe3c0a397920d5
Parents: 23731fe
Author: huafengw <fv...@gmail.com>
Authored: Tue May 9 11:21:44 2017 +0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 18 13:27:45 2017 -0700
----------------------------------------------------------------------
.../core/construction/PCollectionsTest.java | 12 +++++++
.../direct/WindowEvaluatorFactoryTest.java | 10 ++++++
.../apache/beam/sdk/testing/StaticWindows.java | 12 +++++++
.../transforms/windowing/CalendarWindows.java | 36 +++++++++++++++++++
.../sdk/transforms/windowing/FixedWindows.java | 11 ++++++
.../sdk/transforms/windowing/GlobalWindows.java | 11 ++++++
.../windowing/IncompatibleWindowException.java | 38 ++++++++++++++++++++
.../transforms/windowing/InvalidWindows.java | 11 ++++++
.../beam/sdk/transforms/windowing/Sessions.java | 11 ++++++
.../transforms/windowing/SlidingWindows.java | 11 ++++++
.../beam/sdk/transforms/windowing/WindowFn.java | 21 +++++++++++
.../apache/beam/sdk/util/IdentityWindowFn.java | 11 ++++++
.../beam/sdk/testing/StaticWindowsTest.java | 12 +++++++
.../windowing/CalendarWindowsTest.java | 24 +++++++++++++
.../transforms/windowing/FixedWindowsTest.java | 7 ++++
.../sdk/transforms/windowing/SessionsTest.java | 14 ++++++++
.../windowing/SlidingWindowsTest.java | 11 ++++++
.../sdk/transforms/windowing/WindowTest.java | 8 +++++
.../sdk/util/IdentitySideInputWindowFn.java | 4 +++
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 13 +++++++
20 files changed, 288 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
index 2c45cbd..66700d0 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
@@ -44,6 +44,7 @@ import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -158,6 +159,17 @@ public class PCollectionsTest {
}
@Override
+ public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
+ if (!this.isCompatible(other)) {
+ throw new IncompatibleWindowException(
+ other,
+ String.format(
+ "%s is only compatible with %s.",
+ CustomWindows.class.getSimpleName(), CustomWindows.class.getSimpleName()));
+ }
+ }
+
+ @Override
public Coder<BoundedWindow> windowCoder() {
return new AtomicCoder<BoundedWindow>() {
@Override public void verifyDeterministic() {}
http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
index a91bab5..96fdfab 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -304,6 +305,15 @@ public class WindowEvaluatorFactoryTest {
}
@Override
+ public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
+ throw new IncompatibleWindowException(
+ other,
+ String.format(
+ "%s is not compatible with any other %s.",
+ EvaluatorTestWindowFn.class.getSimpleName(), WindowFn.class.getSimpleName()));
+ }
+
+ @Override
public Coder<BoundedWindow> windowCoder() {
@SuppressWarnings({"unchecked", "rawtypes"}) Coder coder =
(Coder) GlobalWindow.Coder.INSTANCE;
http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
index fde1669..c11057a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.Objects;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
@@ -97,6 +98,17 @@ final class StaticWindows extends NonMergingWindowFn<Object, BoundedWindow> {
}
@Override
+ public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
+ if (!this.isCompatible(other)) {
+ throw new IncompatibleWindowException(
+ other,
+ String.format(
+ "Only %s objects with the same window supplier are compatible.",
+ StaticWindows.class.getSimpleName()));
+ }
+ }
+
+ @Override
public Coder<BoundedWindow> windowCoder() {
return coder;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/CalendarWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/CalendarWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/CalendarWindows.java
index fada50a..989c431 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/CalendarWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/CalendarWindows.java
@@ -145,6 +145,18 @@ public class CalendarWindows {
}
@Override
+ public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
+ if (!this.isCompatible(other)) {
+ throw new IncompatibleWindowException(
+ other,
+ String.format(
+ "Only %s objects with the same number of days, start date "
+ + "and time zone are compatible.",
+ DaysWindows.class.getSimpleName()));
+ }
+ }
+
+ @Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
@@ -245,6 +257,18 @@ public class CalendarWindows {
}
@Override
+ public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
+ if (!this.isCompatible(other)) {
+ throw new IncompatibleWindowException(
+ other,
+ String.format(
+ "Only %s objects with the same number of months, "
+ + "day of month, start date and time zone are compatible.",
+ MonthsWindows.class.getSimpleName()));
+ }
+ }
+
+ @Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
@@ -354,6 +378,18 @@ public class CalendarWindows {
}
@Override
+ public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
+ if (!this.isCompatible(other)) {
+ throw new IncompatibleWindowException(
+ other,
+ String.format(
+ "Only %s objects with the same number of years, month of year, "
+ + "day of month, start date and time zone are compatible.",
+ YearsWindows.class.getSimpleName()));
+ }
+ }
+
+ @Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
index 8683a60..8b16916 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
@@ -101,6 +101,17 @@ public class FixedWindows extends PartitioningWindowFn<Object, IntervalWindow> {
return this.equals(other);
}
+ @Override
+ public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
+ if (!this.isCompatible(other)) {
+ throw new IncompatibleWindowException(
+ other,
+ String.format(
+ "Only %s objects with the same size and offset are compatible.",
+ FixedWindows.class.getSimpleName()));
+ }
+ }
+
public Duration getSize() {
return size;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
index 400be1f..b49328b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
@@ -41,6 +41,17 @@ public class GlobalWindows extends NonMergingWindowFn<Object, GlobalWindow> {
}
@Override
+ public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
+ if (!this.isCompatible(other)) {
+ throw new IncompatibleWindowException(
+ other,
+ String.format(
+ "%s is only compatible with %s.",
+ GlobalWindows.class.getSimpleName(), GlobalWindows.class.getSimpleName()));
+ }
+ }
+
+ @Override
public Coder<GlobalWindow> windowCoder() {
return GlobalWindow.Coder.INSTANCE;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IncompatibleWindowException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IncompatibleWindowException.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IncompatibleWindowException.java
new file mode 100644
index 0000000..b7b96ad
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IncompatibleWindowException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+/**
+ * Exception thrown by {@link WindowFn#verifyCompatibility(WindowFn)} if two compared
+ * WindowFns are not compatible, including the explanation of incompatibility.
+ */
+public class IncompatibleWindowException extends Exception {
+ private WindowFn<?, ?> givenWindowFn;
+ private String reason;
+
+ public IncompatibleWindowException(WindowFn<?, ?> windowFn, String reason) {
+ this.givenWindowFn = windowFn;
+ this.reason = reason;
+ }
+
+ @Override
+ public String getMessage() {
+ String windowFn = givenWindowFn == null ? "null" : givenWindowFn.getClass().getSimpleName();
+ return String.format("The given WindowFn is %s. %s", windowFn, reason);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/InvalidWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/InvalidWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/InvalidWindows.java
index 92041fc..a8084f4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/InvalidWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/InvalidWindows.java
@@ -75,6 +75,17 @@ public class InvalidWindows<W extends BoundedWindow> extends WindowFn<Object, W>
}
@Override
+ public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
+ if (!this.isCompatible(other)) {
+ throw new IncompatibleWindowException(
+ other,
+ String.format(
+ "Only %s objects with the same originalWindowFn are compatible.",
+ InvalidWindows.class.getSimpleName()));
+ }
+ }
+
+ @Override
public WindowMappingFn<W> getDefaultWindowMappingFn() {
throw new UnsupportedOperationException("InvalidWindows is not allowed in side inputs");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java
index 5cc7c65..115a964 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java
@@ -80,6 +80,17 @@ public class Sessions extends WindowFn<Object, IntervalWindow> {
}
@Override
+ public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
+ if (!this.isCompatible(other)) {
+ throw new IncompatibleWindowException(
+ other,
+ String.format(
+ "%s is only compatible with %s.",
+ Sessions.class.getSimpleName(), Sessions.class.getSimpleName()));
+ }
+ }
+
+ @Override
public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
throw new UnsupportedOperationException("Sessions is not allowed in side inputs");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
index 650dc37..f657884 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
@@ -148,6 +148,17 @@ public class SlidingWindows extends NonMergingWindowFn<Object, IntervalWindow> {
}
@Override
+ public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
+ if (!this.isCompatible(other)) {
+ throw new IncompatibleWindowException(
+ other,
+ String.format(
+ "Only %s objects with the same size, period and offset are compatible.",
+ SlidingWindows.class.getSimpleName()));
+ }
+ }
+
+ @Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
index 5ebbb41..e329c1d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
@@ -114,10 +114,31 @@ public abstract class WindowFn<T, W extends BoundedWindow>
/**
* Returns whether this performs the same merging as the given
* {@code WindowFn}.
+ *
+ * @deprecated please override verifyCompatibility to throw a useful error message;
+ * we will remove isCompatible at version 3.0.0
*/
+ @Deprecated
public abstract boolean isCompatible(WindowFn<?, ?> other);
/**
+ * Throw {@link IncompatibleWindowException} if this WindowFn does not perform the same merging as
+ * the given ${@code WindowFn}.
+ *
+ * @throws IncompatibleWindowException if compared WindowFns are not compatible.
+ */
+ public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
+ if (!this.isCompatible(other)) {
+ throw new IncompatibleWindowException(
+ other,
+ String.format(
+ "%s is not compatible with %s",
+ this.getClass().getSimpleName(),
+ other.getClass().getSimpleName()));
+ }
+ }
+
+ /**
* Returns the {@link Coder} used for serializing the windows used
* by this windowFn.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
index a61e3a6..a4bfdda 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -84,6 +85,16 @@ public class IdentityWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow> {
}
@Override
+ public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "%s.verifyCompatibility() should never be called."
+ + " It is a private implementation detail of sdk utilities."
+ + " This message indicates a bug in the Beam SDK.",
+ getClass().getCanonicalName()));
+ }
+
+ @Override
public Coder<BoundedWindow> windowCoder() {
// Safe because the prior WindowFn provides both the windows and the coder.
// The Coder is _not_ actually a coder for an arbitrary BoundedWindow.
http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java
index 7ee48c8..2969ca6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.hamcrest.Matchers;
@@ -93,4 +94,15 @@ public class StaticWindowsTest {
thrown.expectMessage("may not be empty");
StaticWindows.of(GlobalWindow.Coder.INSTANCE, ImmutableList.<GlobalWindow>of());
}
+
+ @Test
+ public void testCompatibility() throws IncompatibleWindowException {
+ StaticWindows staticWindows =
+ StaticWindows.of(IntervalWindow.getCoder(), ImmutableList.of(first, second));
+ staticWindows.verifyCompatibility(
+ StaticWindows.of(IntervalWindow.getCoder(), ImmutableList.of(first, second)));
+ thrown.expect(IncompatibleWindowException.class);
+ staticWindows.verifyCompatibility(
+ StaticWindows.of(IntervalWindow.getCoder(), ImmutableList.of(first)));
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/CalendarWindowsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/CalendarWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/CalendarWindowsTest.java
index cd562e9..c8c01f5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/CalendarWindowsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/CalendarWindowsTest.java
@@ -91,6 +91,14 @@ public class CalendarWindowsTest {
}
@Test
+ public void testDaysCompatibility() throws IncompatibleWindowException {
+ CalendarWindows.DaysWindows daysWindows = CalendarWindows.days(10);
+ daysWindows.verifyCompatibility(CalendarWindows.days(10));
+ thrown.expect(IncompatibleWindowException.class);
+ daysWindows.verifyCompatibility(CalendarWindows.days(9));
+ }
+
+ @Test
public void testWeeks() throws Exception {
Map<IntervalWindow, Set<String>> expected = new HashMap<>();
@@ -165,6 +173,14 @@ public class CalendarWindowsTest {
}
@Test
+ public void testMonthsCompatibility() throws IncompatibleWindowException {
+ CalendarWindows.MonthsWindows monthsWindows = CalendarWindows.months(10).beginningOnDay(15);
+ monthsWindows.verifyCompatibility(CalendarWindows.months(10).beginningOnDay(15));
+ thrown.expect(IncompatibleWindowException.class);
+ monthsWindows.verifyCompatibility(CalendarWindows.months(10).beginningOnDay(30));
+ }
+
+ @Test
public void testMultiMonths() throws Exception {
Map<IntervalWindow, Set<String>> expected = new HashMap<>();
@@ -239,6 +255,14 @@ public class CalendarWindowsTest {
}
@Test
+ public void testYearsCompatibility() throws IncompatibleWindowException {
+ CalendarWindows.YearsWindows yearsWindows = CalendarWindows.years(2017).beginningOnDay(1, 1);
+ yearsWindows.verifyCompatibility(CalendarWindows.years(2017).beginningOnDay(1, 1));
+ thrown.expect(IncompatibleWindowException.class);
+ yearsWindows.verifyCompatibility(CalendarWindows.years(2017).beginningOnDay(1, 2));
+ }
+
+ @Test
public void testTimeZone() throws Exception {
Map<IntervalWindow, Set<String>> expected = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
index 47c273a..80a534c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
@@ -149,6 +149,13 @@ public class FixedWindowsTest {
}
@Test
+ public void testVerifyCompatibility() throws IncompatibleWindowException {
+ FixedWindows.of(new Duration(10)).verifyCompatibility(FixedWindows.of(new Duration(10)));
+ thrown.expect(IncompatibleWindowException.class);
+ FixedWindows.of(new Duration(10)).verifyCompatibility(FixedWindows.of(new Duration(20)));
+ }
+
+ @Test
public void testValidOutputTimes() throws Exception {
for (long timestamp : Arrays.asList(200, 800, 700)) {
WindowFnTestUtils.validateGetOutputTimestamp(
http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
index 9d94928..42c15b5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
@@ -36,7 +36,9 @@ import org.apache.beam.sdk.testing.WindowFnTestUtils;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -45,6 +47,8 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
public class SessionsTest {
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
@Test
public void testSimple() throws Exception {
@@ -106,6 +110,16 @@ public class SessionsTest {
Sessions.withGapDuration(new Duration(20))));
}
+ @Test
+ public void testVerifyCompatibility() throws IncompatibleWindowException {
+ Sessions.withGapDuration(new Duration(10))
+ .verifyCompatibility(Sessions.withGapDuration(new Duration(10)));
+
+ thrown.expect(IncompatibleWindowException.class);
+ Sessions.withGapDuration(new Duration(10))
+ .verifyCompatibility(FixedWindows.of(new Duration(10)));
+ }
+
/**
* Validates that the output timestamp for aggregate data falls within the acceptable range.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java
index dd673d3..b14e221 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java
@@ -34,7 +34,9 @@ import org.apache.beam.sdk.testing.WindowFnTestUtils;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -43,6 +45,8 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
public class SlidingWindowsTest {
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
@Test
public void testSimple() throws Exception {
@@ -153,6 +157,13 @@ public class SlidingWindowsTest {
}
@Test
+ public void testVerifyCompatibility() throws IncompatibleWindowException {
+ SlidingWindows.of(new Duration(10)).verifyCompatibility(SlidingWindows.of(new Duration(10)));
+ thrown.expect(IncompatibleWindowException.class);
+ SlidingWindows.of(new Duration(10)).verifyCompatibility(SlidingWindows.of(new Duration(20)));
+ }
+
+ @Test
public void testDefaultWindowMappingFn() {
// [40, 1040), [340, 1340), [640, 1640) ...
SlidingWindows slidingWindows = SlidingWindows.of(new Duration(1000))
http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 92f6a9c..f536a9a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -301,6 +301,14 @@ public class WindowTest implements Serializable {
}
@Override
+ public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
+ if (!this.isCompatible(other)) {
+ throw new IncompatibleWindowException(
+ other, "WindowOddEvenBuckets is only compatible with WindowOddEvenBuckets.");
+ }
+ }
+
+ @Override
public Coder<IntervalWindow> windowCoder() {
return new IntervalWindow.IntervalWindowCoder();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
index 2171466..32e23da 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
@@ -43,6 +44,9 @@ public class IdentitySideInputWindowFn extends NonMergingWindowFn<Integer, Bound
}
@Override
+ public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {}
+
+ @Override
public Coder<BoundedWindow> windowCoder() {
// not used
return (Coder) GlobalWindow.Coder.INSTANCE;
http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index d60c721..ba0cea8 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -115,6 +115,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -698,6 +699,18 @@ public class BigQueryIOTest implements Serializable {
}
@Override
+ public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
+ if (!this.isCompatible(other)) {
+ throw new IncompatibleWindowException(
+ other,
+ String.format(
+ "%s is only compatible with %s.",
+ PartitionedGlobalWindows.class.getSimpleName(),
+ PartitionedGlobalWindows.class.getSimpleName()));
+ }
+ }
+
+ @Override
public Coder<PartitionedGlobalWindow> windowCoder() {
return new PartitionedGlobalWindowCoder();
}
[2/2] beam git commit: This closes #2985: WindowFn#isCompatible
should provide a meaningful reason
Posted by ke...@apache.org.
This closes #2985: WindowFn#isCompatible should provide a meaningful reason
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8c572ef0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8c572ef0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8c572ef0
Branch: refs/heads/master
Commit: 8c572ef0b998a4fa529e2f4ba0e622b3db9f4ec6
Parents: 8ef812d a8d2125
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 18 13:27:59 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 18 13:27:59 2017 -0700
----------------------------------------------------------------------
.../core/construction/PCollectionsTest.java | 12 +++++++
.../direct/WindowEvaluatorFactoryTest.java | 10 ++++++
.../apache/beam/sdk/testing/StaticWindows.java | 12 +++++++
.../transforms/windowing/CalendarWindows.java | 36 +++++++++++++++++++
.../sdk/transforms/windowing/FixedWindows.java | 11 ++++++
.../sdk/transforms/windowing/GlobalWindows.java | 11 ++++++
.../windowing/IncompatibleWindowException.java | 38 ++++++++++++++++++++
.../transforms/windowing/InvalidWindows.java | 11 ++++++
.../beam/sdk/transforms/windowing/Sessions.java | 11 ++++++
.../transforms/windowing/SlidingWindows.java | 11 ++++++
.../beam/sdk/transforms/windowing/WindowFn.java | 21 +++++++++++
.../apache/beam/sdk/util/IdentityWindowFn.java | 11 ++++++
.../beam/sdk/testing/StaticWindowsTest.java | 12 +++++++
.../windowing/CalendarWindowsTest.java | 24 +++++++++++++
.../transforms/windowing/FixedWindowsTest.java | 7 ++++
.../sdk/transforms/windowing/SessionsTest.java | 14 ++++++++
.../windowing/SlidingWindowsTest.java | 11 ++++++
.../sdk/transforms/windowing/WindowTest.java | 8 +++++
.../sdk/util/IdentitySideInputWindowFn.java | 4 +++
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 13 +++++++
20 files changed, 288 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8c572ef0/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8c572ef0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8c572ef0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/InvalidWindows.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8c572ef0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8c572ef0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8c572ef0/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------