You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/20 16:21:30 UTC
[2/3] beam git commit: Move ValueInSingleWindow from testing package
to values package
Move ValueInSingleWindow from testing package to values package
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b793fe0c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b793fe0c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b793fe0c
Branch: refs/heads/master
Commit: b793fe0ceba9f5e0a680f2e2fe0480264b546f3c
Parents: 097573a
Author: Reuven Lax <re...@google.com>
Authored: Wed Mar 15 09:05:55 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Mar 20 09:20:58 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/SplittableParDoTest.java | 2 +-
.../apache/beam/sdk/testing/GatherAllPanes.java | 1 +
.../org/apache/beam/sdk/testing/PAssert.java | 1 +
.../apache/beam/sdk/testing/PaneExtractors.java | 1 +
.../beam/sdk/testing/ValueInSingleWindow.java | 134 -------------------
.../apache/beam/sdk/transforms/DoFnTester.java | 2 +-
.../beam/sdk/values/ValueInSingleWindow.java | 134 +++++++++++++++++++
.../beam/sdk/testing/GatherAllPanesTest.java | 1 +
.../beam/sdk/testing/PaneExtractorsTest.java | 1 +
.../testing/ValueInSingleWindowCoderTest.java | 1 +
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 2 +-
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 3 +-
12 files changed, 144 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b793fe0c/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 9402502..af547c2 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -41,7 +41,6 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.ValueInSingleWindow;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
@@ -58,6 +57,7 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
http://git-wip-us.apache.org/repos/asf/beam/blob/b793fe0c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
index bf2cd0b..6b24d95 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.util.IdentityWindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
/**
* Gathers all panes of each window into exactly one output.
http://git-wip-us.apache.org/repos/asf/beam/blob/b793fe0c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index f6a409a..412753c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -72,6 +72,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/beam/blob/b793fe0c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
index dd1fac9..f88efcb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
/**
* {@link PTransform PTransforms} which take an {@link Iterable} of {@link ValueInSingleWindow
http://git-wip-us.apache.org/repos/asf/beam/blob/b793fe0c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java
deleted file mode 100644
index b746f6d..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.testing;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.auto.value.AutoValue;
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.joda.time.Instant;
-
-/**
- * An immutable tuple of value, timestamp, window, and pane.
- *
- * @param <T> the type of the value
- */
-@AutoValue
-public abstract class ValueInSingleWindow<T> {
- /** Returns the value of this {@code ValueInSingleWindow}. */
- @Nullable
- public abstract T getValue();
-
- /** Returns the timestamp of this {@code ValueInSingleWindow}. */
- public abstract Instant getTimestamp();
-
- /** Returns the window of this {@code ValueInSingleWindow}. */
- public abstract BoundedWindow getWindow();
-
- /** Returns the pane of this {@code ValueInSingleWindow} in its window. */
- public abstract PaneInfo getPane();
-
- public static <T> ValueInSingleWindow<T> of(
- T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) {
- return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, paneInfo);
- }
-
- /** A coder for {@link ValueInSingleWindow}. */
- public static class Coder<T> extends StandardCoder<ValueInSingleWindow<T>> {
- private final org.apache.beam.sdk.coders.Coder<T> valueCoder;
- private final org.apache.beam.sdk.coders.Coder<BoundedWindow> windowCoder;
-
- public static <T> Coder<T> of(
- org.apache.beam.sdk.coders.Coder<T> valueCoder,
- org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) {
- return new Coder<>(valueCoder, windowCoder);
- }
-
- @JsonCreator
- public static <T> Coder<T> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<org.apache.beam.sdk.coders.Coder<?>> components) {
- checkArgument(components.size() == 2, "Expecting 2 components, got %s", components.size());
- @SuppressWarnings("unchecked")
- org.apache.beam.sdk.coders.Coder<T> valueCoder =
- (org.apache.beam.sdk.coders.Coder<T>) components.get(0);
- @SuppressWarnings("unchecked")
- org.apache.beam.sdk.coders.Coder<BoundedWindow> windowCoder =
- (org.apache.beam.sdk.coders.Coder<BoundedWindow>) components.get(1);
- return new Coder<>(valueCoder, windowCoder);
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- Coder(
- org.apache.beam.sdk.coders.Coder<T> valueCoder,
- org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) {
- this.valueCoder = valueCoder;
- this.windowCoder = (org.apache.beam.sdk.coders.Coder) windowCoder;
- }
-
- @Override
- public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream, Context context)
- throws IOException {
- Context nestedContext = context.nested();
- InstantCoder.of().encode(windowedElem.getTimestamp(), outStream, nestedContext);
- windowCoder.encode(windowedElem.getWindow(), outStream, nestedContext);
- PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream, nestedContext);
- valueCoder.encode(windowedElem.getValue(), outStream, context);
- }
-
- @Override
- public ValueInSingleWindow<T> decode(InputStream inStream, Context context) throws IOException {
- Context nestedContext = context.nested();
- Instant timestamp = InstantCoder.of().decode(inStream, nestedContext);
- BoundedWindow window = windowCoder.decode(inStream, nestedContext);
- PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream, nestedContext);
- T value = valueCoder.decode(inStream, context);
- return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, pane);
- }
-
- @Override
- public List<? extends org.apache.beam.sdk.coders.Coder<?>> getCoderArguments() {
- // Coder arguments are coders for the type parameters of the coder - i.e. only T.
- return ImmutableList.of(valueCoder);
- }
-
- @Override
- public List<? extends org.apache.beam.sdk.coders.Coder<?>> getComponents() {
- // Coder components are all inner coders that it uses - i.e. both T and BoundedWindow.
- return ImmutableList.of(valueCoder, windowCoder);
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- valueCoder.verifyDeterministic();
- windowCoder.verifyDeterministic();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/b793fe0c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index cc5281c..01c639a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -33,7 +33,6 @@ import java.util.Map;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.ValueInSingleWindow;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
@@ -51,6 +50,7 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Instant;
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/b793fe0c/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
new file mode 100644
index 0000000..c94190f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.joda.time.Instant;
+
+/**
+ * An immutable tuple of value, timestamp, window, and pane.
+ *
+ * @param <T> the type of the value
+ */
+@AutoValue
+public abstract class ValueInSingleWindow<T> {
+ /** Returns the value of this {@code ValueInSingleWindow}. */
+ @Nullable
+ public abstract T getValue();
+
+ /** Returns the timestamp of this {@code ValueInSingleWindow}. */
+ public abstract Instant getTimestamp();
+
+ /** Returns the window of this {@code ValueInSingleWindow}. */
+ public abstract BoundedWindow getWindow();
+
+ /** Returns the pane of this {@code ValueInSingleWindow} in its window. */
+ public abstract PaneInfo getPane();
+
+ public static <T> ValueInSingleWindow<T> of(
+ T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) {
+ return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, paneInfo);
+ }
+
+ /** A coder for {@link ValueInSingleWindow}. */
+ public static class Coder<T> extends StandardCoder<ValueInSingleWindow<T>> {
+ private final org.apache.beam.sdk.coders.Coder<T> valueCoder;
+ private final org.apache.beam.sdk.coders.Coder<BoundedWindow> windowCoder;
+
+ public static <T> Coder<T> of(
+ org.apache.beam.sdk.coders.Coder<T> valueCoder,
+ org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) {
+ return new Coder<>(valueCoder, windowCoder);
+ }
+
+ @JsonCreator
+ public static <T> Coder<T> of(
+ @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+ List<org.apache.beam.sdk.coders.Coder<?>> components) {
+ checkArgument(components.size() == 2, "Expecting 2 components, got %s", components.size());
+ @SuppressWarnings("unchecked")
+ org.apache.beam.sdk.coders.Coder<T> valueCoder =
+ (org.apache.beam.sdk.coders.Coder<T>) components.get(0);
+ @SuppressWarnings("unchecked")
+ org.apache.beam.sdk.coders.Coder<BoundedWindow> windowCoder =
+ (org.apache.beam.sdk.coders.Coder<BoundedWindow>) components.get(1);
+ return new Coder<>(valueCoder, windowCoder);
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Coder(
+ org.apache.beam.sdk.coders.Coder<T> valueCoder,
+ org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) {
+ this.valueCoder = valueCoder;
+ this.windowCoder = (org.apache.beam.sdk.coders.Coder) windowCoder;
+ }
+
+ @Override
+ public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream, Context context)
+ throws IOException {
+ Context nestedContext = context.nested();
+ InstantCoder.of().encode(windowedElem.getTimestamp(), outStream, nestedContext);
+ windowCoder.encode(windowedElem.getWindow(), outStream, nestedContext);
+ PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream, nestedContext);
+ valueCoder.encode(windowedElem.getValue(), outStream, context);
+ }
+
+ @Override
+ public ValueInSingleWindow<T> decode(InputStream inStream, Context context) throws IOException {
+ Context nestedContext = context.nested();
+ Instant timestamp = InstantCoder.of().decode(inStream, nestedContext);
+ BoundedWindow window = windowCoder.decode(inStream, nestedContext);
+ PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream, nestedContext);
+ T value = valueCoder.decode(inStream, context);
+ return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, pane);
+ }
+
+ @Override
+ public List<? extends org.apache.beam.sdk.coders.Coder<?>> getCoderArguments() {
+ // Coder arguments are coders for the type parameters of the coder - i.e. only T.
+ return ImmutableList.of(valueCoder);
+ }
+
+ @Override
+ public List<? extends org.apache.beam.sdk.coders.Coder<?>> getComponents() {
+ // Coder components are all inner coders that it uses - i.e. both T and BoundedWindow.
+ return ImmutableList.of(valueCoder, windowCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ valueCoder.verifyDeterministic();
+ windowCoder.verifyDeterministic();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b793fe0c/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java
index a96e3f8..969bbc4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
http://git-wip-us.apache.org/repos/asf/beam/blob/b793fe0c/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
index 7df2f89..8801bde 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/beam/blob/b793fe0c/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ValueInSingleWindowCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ValueInSingleWindowCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ValueInSingleWindowCoderTest.java
index 3cc016e..9ce18d7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ValueInSingleWindowCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ValueInSingleWindowCoderTest.java
@@ -21,6 +21,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/beam/blob/b793fe0c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index d4ca23c..e9ea0e0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -93,7 +93,6 @@ import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.testing.ValueInSingleWindow;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@@ -132,6 +131,7 @@ import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/beam/blob/b793fe0c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index cf96acc..dcc3800 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -108,7 +108,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.PassThroughThenCleanup;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.PassThroughThenCleanup.CleanupOperation;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TransformingSource;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.TableRowWriter;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
@@ -132,7 +131,6 @@ import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.ValueInSingleWindow;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
@@ -161,6 +159,7 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.joda.time.Instant;