You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/05 00:04:19 UTC

[13/19] beam git commit: Move Reshuffle and ReshuffleTrigger out of util

Move Reshuffle and ReshuffleTrigger out of util

These are two deprecated, but regularly used, transforms and triggers, to the
point where the Runner API proto even includes something akin to the
ReshuffleTrigger.

They are moved to where they "belong" and marked deprecated.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e0b3f806
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e0b3f806
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e0b3f806

Branch: refs/heads/master
Commit: e0b3f8064b97d8678e75bf6ba25244bca31e6a7d
Parents: 6542eaf
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed May 3 19:57:02 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 4 16:06:55 2017 -0700

----------------------------------------------------------------------
 .../runners/core/construction/Triggers.java     |   2 +-
 .../triggers/ReshuffleTriggerStateMachine.java  |   2 +-
 .../flink/FlinkBatchTransformTranslators.java   |   2 +-
 .../FlinkStreamingTransformTranslators.java     |   2 +-
 .../beam/runners/dataflow/DataflowRunner.java   |   2 +-
 .../dataflow/ReshuffleOverrideFactory.java      |   4 +-
 .../translation/GroupCombineFunctions.java      |   3 +-
 .../spark/translation/TransformTranslator.java  |   2 +-
 .../streaming/StreamingTransformTranslator.java |   2 +-
 .../org/apache/beam/sdk/io/FileSystems.java     |   1 -
 .../beam/sdk/transforms/ReifyTimestamps.java    |  73 +++++
 .../apache/beam/sdk/transforms/Reshuffle.java   |  92 +++++++
 .../transforms/windowing/ReshuffleTrigger.java  |  58 ++++
 .../apache/beam/sdk/util/ReifyTimestamps.java   |  76 ------
 .../org/apache/beam/sdk/util/Reshuffle.java     |  89 ------
 .../apache/beam/sdk/util/ReshuffleTrigger.java  |  52 ----
 .../beam/sdk/transforms/GroupByKeyTest.java     |   1 -
 .../sdk/transforms/ReifyTimestampsTest.java     | 104 +++++++
 .../beam/sdk/transforms/ReshuffleTest.java      | 266 ++++++++++++++++++
 .../windowing/ReshuffleTriggerTest.java         |  42 +++
 .../beam/sdk/util/ReifyTimestampsTest.java      | 109 --------
 .../org/apache/beam/sdk/util/ReshuffleTest.java | 271 -------------------
 .../beam/sdk/util/ReshuffleTriggerTest.java     |  44 ---
 .../extensions/sorter/ExternalSorterTest.java   |   1 -
 .../apache/beam/sdk/io/common/HashingFn.java    |   2 -
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    |   2 +-
 .../sdk/io/gcp/bigquery/BigQueryHelpers.java    |   2 -
 .../io/gcp/bigquery/BigQueryQuerySource.java    |   1 -
 .../sdk/io/gcp/bigquery/CalculateSchemas.java   |   1 -
 .../beam/sdk/io/gcp/bigquery/CreateTables.java  |   1 -
 .../io/gcp/bigquery/StreamingWriteTables.java   |   2 +-
 .../sdk/io/gcp/bigquery/TableDestination.java   |   1 -
 .../sdk/io/gcp/bigquery/WritePartition.java     |   1 -
 .../beam/sdk/io/gcp/bigquery/WriteRename.java   |   1 -
 .../beam/sdk/io/gcp/bigquery/WriteTables.java   |   1 -
 .../io/gcp/bigquery/FakeBigQueryServices.java   |   2 -
 .../sdk/io/gcp/bigquery/FakeJobService.java     |   3 -
 .../sdk/io/gcp/bigquery/TableContainer.java     |   1 -
 .../ConfigurableEmployeeInputFormat.java        |   1 -
 .../hadoop/inputformat/EmployeeInputFormat.java |   1 -
 .../ReuseObjectsEmployeeInputFormat.java        |   1 -
 .../hadoop/inputformat/TestEmployeeDataSet.java |   2 -
 .../inputformat/HIFIOWithElasticTest.java       |   1 -
 .../HIFIOWithEmbeddedCassandraTest.java         |   2 -
 .../integration/tests/HIFIOCassandraIT.java     |   2 -
 .../integration/tests/HIFIOElasticIT.java       |   1 -
 .../org/apache/beam/sdk/io/jdbc/JdbcIO.java     |   2 -
 .../org/apache/beam/sdk/io/jdbc/JdbcIOIT.java   |   1 -
 .../org/apache/beam/sdk/io/jms/JmsRecord.java   |   1 -
 .../beam/sdk/io/kafka/KafkaCheckpointMark.java  |   2 -
 .../apache/beam/sdk/io/kafka/KafkaRecord.java   |   1 -
 .../CoderBasedKafkaDeserializer.java            |   1 -
 .../CoderBasedKafkaSerializer.java              |   1 -
 .../serialization/InstantDeserializer.java      |   1 -
 .../kafka/serialization/InstantSerializer.java  |   1 -
 .../beam/sdk/io/mongodb/MongoDbGridFSIO.java    |   3 -
 .../sdk/io/mongodb/MongoDBGridFSIOTest.java     |   3 -
 .../beam/sdk/io/mongodb/MongoDbIOTest.java      |   4 -
 .../org/apache/beam/sdk/io/mqtt/MqttIO.java     |   3 -
 59 files changed, 648 insertions(+), 708 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java
index 5e73571..df6c9ed 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java
@@ -40,10 +40,10 @@ import org.apache.beam.sdk.transforms.windowing.Never;
 import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger;
 import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
 import org.apache.beam.sdk.transforms.windowing.TimestampTransform;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.ReshuffleTrigger;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ReshuffleTriggerStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ReshuffleTriggerStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ReshuffleTriggerStateMachine.java
index 8a2b736..61ede34 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ReshuffleTriggerStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ReshuffleTriggerStateMachine.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.core.triggers;
 
-import org.apache.beam.sdk.util.Reshuffle;
+import org.apache.beam.sdk.transforms.Reshuffle;
 
 /**
  * The trigger used with {@link Reshuffle} which triggers on every element

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index 205cd3e..3689698 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -53,6 +53,7 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.join.UnionCoder;
@@ -62,7 +63,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 3cf7683..615eaea 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -56,6 +56,7 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.join.UnionCoder;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
@@ -66,7 +67,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.AppliedCombineFn;
-import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index ce824c6..7123316 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -110,6 +110,7 @@ import org.apache.beam.sdk.transforms.Combine.GroupedValues;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.View;
@@ -120,7 +121,6 @@ import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.NameUtils;
 import org.apache.beam.sdk.util.ReleaseInfo;
-import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.util.ValueWithRecordId;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java
index d33fdfe..5814efd 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java
@@ -26,11 +26,11 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.IdentityWindowFn;
-import org.apache.beam.sdk.util.Reshuffle;
-import org.apache.beam.sdk.util.ReshuffleTrigger;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
index d19094d..1fa46b5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -24,6 +24,7 @@ import org.apache.beam.runners.spark.util.ByteArray;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.values.KV;
@@ -223,7 +224,7 @@ public class GroupCombineFunctions {
 
   /**
    * An implementation of
-   * {@link org.apache.beam.sdk.util.Reshuffle} for the Spark runner.
+   * {@link Reshuffle} for the Spark runner.
    */
   public static <K, V> JavaRDD<WindowedValue<KV<K, V>>> reshuffle(
       JavaRDD<WindowedValue<KV<K, V>>> rdd,

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 77d2c0e..8a8e246 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -50,12 +50,12 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.CombineFnUtil;
-import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 08f0e17..2c4a747 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -66,13 +66,13 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.CombineFnUtil;
-import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index a3af8d9..2e11177 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -33,7 +33,6 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
 import com.google.common.collect.TreeMultimap;
-
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.channels.ReadableByteChannel;

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java
new file mode 100644
index 0000000..0b1ab25
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.transforms;
+
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+/**
+ * {@link PTransform PTransforms} for reifying the timestamp of values and reemitting the original
+ * value with the original timestamp.
+ */
+class ReifyTimestamps {
+  private ReifyTimestamps() {}
+
+  /**
+   * Create a {@link PTransform} that will output all input {@link KV KVs} with the timestamp inside
+   * the value.
+   */
+  public static <K, V>
+      PTransform<PCollection<? extends KV<K, V>>, PCollection<KV<K, TimestampedValue<V>>>>
+          inValues() {
+    return ParDo.of(new ReifyValueTimestampDoFn<K, V>());
+  }
+
+  /**
+   * Create a {@link PTransform} that consumes {@link KV KVs} with a {@link TimestampedValue} as the
+   * value, and outputs the {@link KV} of the input key and value at the timestamp specified by the
+   * {@link TimestampedValue}.
+   */
+  public static <K, V>
+      PTransform<PCollection<? extends KV<K, TimestampedValue<V>>>, PCollection<KV<K, V>>>
+          extractFromValues() {
+    return ParDo.of(new ExtractTimestampedValueDoFn<K, V>());
+  }
+
+  private static class ReifyValueTimestampDoFn<K, V>
+      extends DoFn<KV<K, V>, KV<K, TimestampedValue<V>>> {
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      context.output(
+          KV.of(
+              context.element().getKey(),
+              TimestampedValue.of(context.element().getValue(), context.timestamp())));
+    }
+  }
+
+  private static class ExtractTimestampedValueDoFn<K, V>
+      extends DoFn<KV<K, TimestampedValue<V>>, KV<K, V>> {
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      KV<K, TimestampedValue<V>> kv = context.element();
+      context.outputWithTimestamp(
+          KV.of(kv.getKey(), kv.getValue().getValue()), kv.getValue().getTimestamp());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
new file mode 100644
index 0000000..5394826
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.IdentityWindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.joda.time.Duration;
+
+/**
+ * <b>For internal use only; no backwards compatibility guarantees.</b>
+ *
+ * <p>A {@link PTransform} that returns a {@link PCollection} equivalent to its input but
+ * operationally provides some of the side effects of a {@link GroupByKey}, in particular preventing
+ * fusion of the surrounding transforms, checkpointing and deduplication by id.
+ *
+ * <p>Performs a {@link GroupByKey} so that the data is key-partitioned. Configures the {@link
+ * WindowingStrategy} so that no data is dropped, but doesn't affect the need for the user to
+ * specify allowed lateness and accumulation mode before a user-inserted GroupByKey.
+ *
+ * @param <K> The type of key being reshuffled on.
+ * @param <V> The type of value being reshuffled.
+ * @deprecated this transform's intended side effects are not portable; it will likely be removed
+ */
+@Internal
+@Deprecated
+public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
+
+  private Reshuffle() {
+  }
+
+  public static <K, V> Reshuffle<K, V> of() {
+    return new Reshuffle<K, V>();
+  }
+
+  @Override
+  public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
+    WindowingStrategy<?, ?> originalStrategy = input.getWindowingStrategy();
+    // If the input has already had its windows merged, then the GBK that performed the merge
+    // will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained
+    // here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged.
+    // The TimestampCombiner is set to ensure the GroupByKey does not shift elements forwards in
+    // time.
+    // Because this outputs as fast as possible, this should not hold the watermark.
+    Window<KV<K, V>> rewindow =
+        Window.<KV<K, V>>into(new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
+            .triggering(new ReshuffleTrigger<>())
+            .discardingFiredPanes()
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+
+    return input.apply(rewindow)
+        .apply("ReifyOriginalTimestamps", ReifyTimestamps.<K, V>inValues())
+        .apply(GroupByKey.<K, TimestampedValue<V>>create())
+        // Set the windowing strategy directly, so that it doesn't get counted as the user having
+        // set allowed lateness.
+        .setWindowingStrategyInternal(originalStrategy)
+        .apply("ExpandIterable", ParDo.of(
+            new DoFn<KV<K, Iterable<TimestampedValue<V>>>, KV<K, TimestampedValue<V>>>() {
+              @ProcessElement
+              public void processElement(ProcessContext c) {
+                K key = c.element().getKey();
+                for (TimestampedValue<V> value : c.element().getValue()) {
+                  c.output(KV.of(key, value));
+                }
+              }
+            }))
+        .apply("RestoreOriginalTimestamps", ReifyTimestamps.<K, V>extractFromValues());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/ReshuffleTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/ReshuffleTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/ReshuffleTrigger.java
new file mode 100644
index 0000000..ceb7011
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/ReshuffleTrigger.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import java.util.List;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.joda.time.Instant;
+
+/**
+ * <b>For internal use only; no backwards compatibility guarantees.</b>
+ *
+ * <p>The trigger used with {@link Reshuffle} which triggers on every element and never buffers
+ * state.
+ *
+ * @param <W> The kind of window that is being reshuffled.
+ * @deprecated The intended side effect of {@link Reshuffle} is not portable; it will likely be
+ *     removed
+ */
+@Internal
+@Deprecated
+public class ReshuffleTrigger<W extends BoundedWindow> extends Trigger {
+
+  public ReshuffleTrigger() {
+    super();
+  }
+
+  @Override
+  protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+    return this;
+  }
+
+  @Override
+  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+    throw new UnsupportedOperationException(
+        "ReshuffleTrigger should not be used outside of Reshuffle");
+  }
+
+  @Override
+  public String toString() {
+    return "ReshuffleTrigger()";
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestamps.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestamps.java
deleted file mode 100644
index 3b291af..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestamps.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TimestampedValue;
-
-/**
- * {@link PTransform PTransforms} for reifying the timestamp of values and reemitting the original
- * value with the original timestamp.
- */
-public class ReifyTimestamps {
-  private ReifyTimestamps() {}
-
-  /**
-   * Create a {@link PTransform} that will output all input {@link KV KVs} with the timestamp inside
-   * the value.
-   */
-  public static <K, V>
-      PTransform<PCollection<? extends KV<K, V>>, PCollection<KV<K, TimestampedValue<V>>>>
-          inValues() {
-    return ParDo.of(new ReifyValueTimestampDoFn<K, V>());
-  }
-
-  /**
-   * Create a {@link PTransform} that consumes {@link KV KVs} with a {@link TimestampedValue} as the
-   * value, and outputs the {@link KV} of the input key and value at the timestamp specified by the
-   * {@link TimestampedValue}.
-   */
-  public static <K, V>
-      PTransform<PCollection<? extends KV<K, TimestampedValue<V>>>, PCollection<KV<K, V>>>
-          extractFromValues() {
-    return ParDo.of(new ExtractTimestampedValueDoFn<K, V>());
-  }
-
-  private static class ReifyValueTimestampDoFn<K, V>
-      extends DoFn<KV<K, V>, KV<K, TimestampedValue<V>>> {
-    @ProcessElement
-    public void processElement(ProcessContext context) {
-      context.output(
-          KV.of(
-              context.element().getKey(),
-              TimestampedValue.of(context.element().getValue(), context.timestamp())));
-    }
-  }
-
-  private static class ExtractTimestampedValueDoFn<K, V>
-      extends DoFn<KV<K, TimestampedValue<V>>, KV<K, V>> {
-    @ProcessElement
-    public void processElement(ProcessContext context) {
-      KV<K, TimestampedValue<V>> kv = context.element();
-      context.outputWithTimestamp(
-          KV.of(kv.getKey(), kv.getValue().getValue()), kv.getValue().getTimestamp());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
deleted file mode 100644
index 887f011..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.joda.time.Duration;
-
-/**
- * A {@link PTransform} that returns a {@link PCollection} equivalent to its input but operationally
- * provides some of the side effects of a {@link GroupByKey}, in particular preventing fusion of
- * the surrounding transforms, checkpointing and deduplication by id (see
- * {@link ValueWithRecordId}).
- *
- * <p>Performs a {@link GroupByKey} so that the data is key-partitioned. Configures the
- * {@link WindowingStrategy} so that no data is dropped, but doesn't affect the need for
- * the user to specify allowed lateness and accumulation mode before a user-inserted GroupByKey.
- *
- * @param <K> The type of key being reshuffled on.
- * @param <V> The type of value being reshuffled.
- */
-public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
-
-  private Reshuffle() {
-  }
-
-  public static <K, V> Reshuffle<K, V> of() {
-    return new Reshuffle<K, V>();
-  }
-
-  @Override
-  public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
-    WindowingStrategy<?, ?> originalStrategy = input.getWindowingStrategy();
-    // If the input has already had its windows merged, then the GBK that performed the merge
-    // will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained
-    // here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged.
-    // The TimestampCombiner is set to ensure the GroupByKey does not shift elements forwards in
-    // time.
-    // Because this outputs as fast as possible, this should not hold the watermark.
-    Window<KV<K, V>> rewindow =
-        Window.<KV<K, V>>into(new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
-            .triggering(new ReshuffleTrigger<>())
-            .discardingFiredPanes()
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
-            .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
-
-    return input.apply(rewindow)
-        .apply("ReifyOriginalTimestamps", ReifyTimestamps.<K, V>inValues())
-        .apply(GroupByKey.<K, TimestampedValue<V>>create())
-        // Set the windowing strategy directly, so that it doesn't get counted as the user having
-        // set allowed lateness.
-        .setWindowingStrategyInternal(originalStrategy)
-        .apply("ExpandIterable", ParDo.of(
-            new DoFn<KV<K, Iterable<TimestampedValue<V>>>, KV<K, TimestampedValue<V>>>() {
-              @ProcessElement
-              public void processElement(ProcessContext c) {
-                K key = c.element().getKey();
-                for (TimestampedValue<V> value : c.element().getValue()) {
-                  c.output(KV.of(key, value));
-                }
-              }
-            }))
-        .apply("RestoreOriginalTimestamps", ReifyTimestamps.<K, V>extractFromValues());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java
deleted file mode 100644
index 8dd648a..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.util.List;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.joda.time.Instant;
-
-/**
- * The trigger used with {@link Reshuffle} which triggers on every element
- * and never buffers state.
- *
- * @param <W> The kind of window that is being reshuffled.
- */
-public class ReshuffleTrigger<W extends BoundedWindow> extends Trigger {
-
-  public ReshuffleTrigger() {
-    super();
-  }
-
-  @Override
-  protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
-    return this;
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-    throw new UnsupportedOperationException(
-        "ReshuffleTrigger should not be used outside of Reshuffle");
-  }
-
-  @Override
-  public String toString() {
-    return "ReshuffleTrigger()";
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 50e9c1d..9cb642a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -54,7 +54,6 @@ import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTimestampsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTimestampsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTimestampsTest.java
new file mode 100644
index 0000000..181433e
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTimestampsTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.transforms;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ReifyTimestamps}.
+ */
+@RunWith(JUnit4.class)
+public class ReifyTimestampsTest implements Serializable {
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void inValuesSucceeds() {
+    PCollection<KV<String, Integer>> timestamped =
+        pipeline
+            .apply(Create.of(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 2), KV.of("baz", 3)))
+            .apply(
+                WithTimestamps.of(
+                    new SerializableFunction<KV<String, Integer>, Instant>() {
+                      @Override
+                      public Instant apply(KV<String, Integer> input) {
+                        return new Instant(input.getValue().longValue());
+                      }
+                    }));
+
+    PCollection<KV<String, TimestampedValue<Integer>>> reified =
+        timestamped.apply(ReifyTimestamps.<String, Integer>inValues());
+
+    PAssert.that(reified)
+        .containsInAnyOrder(
+            KV.of("foo", TimestampedValue.of(0, new Instant(0))),
+            KV.of("foo", TimestampedValue.of(1, new Instant(1))),
+            KV.of("bar", TimestampedValue.of(2, new Instant(2))),
+            KV.of("baz", TimestampedValue.of(3, new Instant(3))));
+
+    pipeline.run();
+  }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void extractFromValuesSucceeds() {
+    PCollection<KV<String, TimestampedValue<Integer>>> preified =
+        pipeline.apply(
+            Create.of(
+                KV.of("foo", TimestampedValue.of(0, new Instant((0)))),
+                KV.of("foo", TimestampedValue.of(1, new Instant(1))),
+                KV.of("bar", TimestampedValue.of(2, new Instant(2))),
+                KV.of("baz", TimestampedValue.of(3, new Instant(3)))));
+
+    PCollection<KV<String, Integer>> timestamped =
+        preified.apply(ReifyTimestamps.<String, Integer>extractFromValues());
+
+    PAssert.that(timestamped)
+        .containsInAnyOrder(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 2), KV.of("baz", 3));
+
+    timestamped.apply(
+        "AssertElementTimestamps",
+        ParDo.of(
+            new DoFn<KV<String, Integer>, Void>() {
+              @ProcessElement
+              public void verifyTimestampsEqualValue(ProcessContext context) {
+                assertThat(
+                    new Instant(context.element().getValue().longValue()),
+                    equalTo(context.timestamp()));
+              }
+            }));
+
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
new file mode 100644
index 0000000..1038fd6
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link Reshuffle}.
+ */
+@RunWith(JUnit4.class)
+public class ReshuffleTest implements Serializable {
+
+  private static final List<KV<String, Integer>> ARBITRARY_KVS = ImmutableList.of(
+        KV.of("k1", 3),
+        KV.of("k5", Integer.MAX_VALUE),
+        KV.of("k5", Integer.MIN_VALUE),
+        KV.of("k2", 66),
+        KV.of("k1", 4),
+        KV.of("k2", -33),
+        KV.of("k3", 0));
+
+  // TODO: test with more than one value per key
+  private static final List<KV<String, Integer>> GBK_TESTABLE_KVS = ImmutableList.of(
+        KV.of("k1", 3),
+        KV.of("k2", 4));
+
+  private static final List<KV<String, Iterable<Integer>>> GROUPED_TESTABLE_KVS = ImmutableList.of(
+        KV.of("k1", (Iterable<Integer>) ImmutableList.of(3)),
+        KV.of("k2", (Iterable<Integer>) ImmutableList.of(4)));
+
+  @Rule
+  public final transient TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testJustReshuffle() {
+
+    PCollection<KV<String, Integer>> input = pipeline
+        .apply(Create.of(ARBITRARY_KVS)
+            .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())));
+
+    PCollection<KV<String, Integer>> output = input
+        .apply(Reshuffle.<String, Integer>of());
+
+    PAssert.that(output).containsInAnyOrder(ARBITRARY_KVS);
+
+    assertEquals(
+        input.getWindowingStrategy(),
+        output.getWindowingStrategy());
+
+    pipeline.run();
+  }
+
+  /**
+   * Tests that timestamps are preserved after applying a {@link Reshuffle} with the default
+   * {@link WindowingStrategy}.
+   */
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testReshufflePreservesTimestamps() {
+    PCollection<KV<String, TimestampedValue<String>>> input =
+        pipeline
+            .apply(
+                Create.timestamped(
+                        TimestampedValue.of("foo", BoundedWindow.TIMESTAMP_MIN_VALUE),
+                        TimestampedValue.of("foo", new Instant(0)),
+                        TimestampedValue.of("bar", new Instant(33)),
+                        TimestampedValue.of("bar", GlobalWindow.INSTANCE.maxTimestamp()))
+                    .withCoder(StringUtf8Coder.of()))
+            .apply(
+                WithKeys.of(
+                    new SerializableFunction<String, String>() {
+                      @Override
+                      public String apply(String input) {
+                        return input;
+                      }
+                    }))
+            .apply("ReifyOriginalTimestamps", ReifyTimestamps.<String, String>inValues());
+
+    // The outer TimestampedValue is the reified timestamp post-reshuffle. The inner
+    // TimestampedValue is the pre-reshuffle timestamp.
+    PCollection<TimestampedValue<TimestampedValue<String>>> output =
+        input
+            .apply(Reshuffle.<String, TimestampedValue<String>>of())
+            .apply(
+                "ReifyReshuffledTimestamps",
+                ReifyTimestamps.<String, TimestampedValue<String>>inValues())
+            .apply(Values.<TimestampedValue<TimestampedValue<String>>>create());
+
+    PAssert.that(output)
+        .satisfies(
+            new SerializableFunction<Iterable<TimestampedValue<TimestampedValue<String>>>, Void>() {
+              @Override
+              public Void apply(Iterable<TimestampedValue<TimestampedValue<String>>> input) {
+                for (TimestampedValue<TimestampedValue<String>> elem : input) {
+                  Instant originalTimestamp = elem.getValue().getTimestamp();
+                  Instant afterReshuffleTimestamp = elem.getTimestamp();
+                  assertThat(
+                      "Reshuffle must preserve element timestamps",
+                      afterReshuffleTimestamp,
+                      equalTo(originalTimestamp));
+                }
+                return null;
+              }
+            });
+
+    pipeline.run();
+  }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testReshuffleAfterSessionsAndGroupByKey() {
+
+    PCollection<KV<String, Iterable<Integer>>> input = pipeline
+        .apply(Create.of(GBK_TESTABLE_KVS)
+            .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
+        .apply(Window.<KV<String, Integer>>into(
+            Sessions.withGapDuration(Duration.standardMinutes(10))))
+        .apply(GroupByKey.<String, Integer>create());
+
+    PCollection<KV<String, Iterable<Integer>>> output = input
+        .apply(Reshuffle.<String, Iterable<Integer>>of());
+
+    PAssert.that(output).containsInAnyOrder(GROUPED_TESTABLE_KVS);
+
+    assertEquals(
+        input.getWindowingStrategy(),
+        output.getWindowingStrategy());
+
+    pipeline.run();
+  }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testReshuffleAfterFixedWindowsAndGroupByKey() {
+
+    PCollection<KV<String, Iterable<Integer>>> input = pipeline
+        .apply(Create.of(GBK_TESTABLE_KVS)
+            .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
+        .apply(Window.<KV<String, Integer>>into(
+            FixedWindows.of(Duration.standardMinutes(10L))))
+        .apply(GroupByKey.<String, Integer>create());
+
+    PCollection<KV<String, Iterable<Integer>>> output = input
+        .apply(Reshuffle.<String, Iterable<Integer>>of());
+
+    PAssert.that(output).containsInAnyOrder(GROUPED_TESTABLE_KVS);
+
+    assertEquals(
+        input.getWindowingStrategy(),
+        output.getWindowingStrategy());
+
+    pipeline.run();
+  }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testReshuffleAfterSlidingWindowsAndGroupByKey() {
+
+    PCollection<KV<String, Iterable<Integer>>> input = pipeline
+        .apply(Create.of(GBK_TESTABLE_KVS)
+            .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
+        .apply(Window.<KV<String, Integer>>into(
+            FixedWindows.of(Duration.standardMinutes(10L))))
+        .apply(GroupByKey.<String, Integer>create());
+
+    PCollection<KV<String, Iterable<Integer>>> output = input
+        .apply(Reshuffle.<String, Iterable<Integer>>of());
+
+    PAssert.that(output).containsInAnyOrder(GROUPED_TESTABLE_KVS);
+
+    assertEquals(
+        input.getWindowingStrategy(),
+        output.getWindowingStrategy());
+
+    pipeline.run();
+  }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testReshuffleAfterFixedWindows() {
+
+    PCollection<KV<String, Integer>> input = pipeline
+        .apply(Create.of(ARBITRARY_KVS)
+            .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
+        .apply(Window.<KV<String, Integer>>into(
+            FixedWindows.of(Duration.standardMinutes(10L))));
+
+    PCollection<KV<String, Integer>> output = input
+        .apply(Reshuffle.<String, Integer>of());
+
+    PAssert.that(output).containsInAnyOrder(ARBITRARY_KVS);
+
+    assertEquals(
+        input.getWindowingStrategy(),
+        output.getWindowingStrategy());
+
+    pipeline.run();
+  }
+
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testReshuffleAfterSlidingWindows() {
+
+    PCollection<KV<String, Integer>> input = pipeline
+        .apply(Create.of(ARBITRARY_KVS)
+            .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
+        .apply(Window.<KV<String, Integer>>into(
+            FixedWindows.of(Duration.standardMinutes(10L))));
+
+    PCollection<KV<String, Integer>> output = input
+        .apply(Reshuffle.<String, Integer>of());
+
+    PAssert.that(output).containsInAnyOrder(ARBITRARY_KVS);
+
+    assertEquals(
+        input.getWindowingStrategy(),
+        output.getWindowingStrategy());
+
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/ReshuffleTriggerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/ReshuffleTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/ReshuffleTriggerTest.java
new file mode 100644
index 0000000..5985ecb
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/ReshuffleTriggerTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ReshuffleTrigger}.
+ */
+@RunWith(JUnit4.class)
+public class ReshuffleTriggerTest {
+
+  /** Public so that other tests can instantiate {@link ReshuffleTrigger}. */
+  public static <W extends BoundedWindow> ReshuffleTrigger<W> forTest() {
+    return new ReshuffleTrigger<>();
+  }
+
+  @Test
+  public void testToString() {
+    Trigger trigger = new ReshuffleTrigger<>();
+    assertEquals("ReshuffleTrigger()", trigger.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReifyTimestampsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReifyTimestampsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReifyTimestampsTest.java
deleted file mode 100644
index 2942efd..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReifyTimestampsTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.ValidatesRunner;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.WithTimestamps;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ReifyTimestamps}.
- */
-@RunWith(JUnit4.class)
-public class ReifyTimestampsTest implements Serializable {
-  @Rule public transient TestPipeline pipeline = TestPipeline.create();
-
-  @Test
-  @Category(ValidatesRunner.class)
-  public void inValuesSucceeds() {
-    PCollection<KV<String, Integer>> timestamped =
-        pipeline
-            .apply(Create.of(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 2), KV.of("baz", 3)))
-            .apply(
-                WithTimestamps.of(
-                    new SerializableFunction<KV<String, Integer>, Instant>() {
-                      @Override
-                      public Instant apply(KV<String, Integer> input) {
-                        return new Instant(input.getValue().longValue());
-                      }
-                    }));
-
-    PCollection<KV<String, TimestampedValue<Integer>>> reified =
-        timestamped.apply(ReifyTimestamps.<String, Integer>inValues());
-
-    PAssert.that(reified)
-        .containsInAnyOrder(
-            KV.of("foo", TimestampedValue.of(0, new Instant(0))),
-            KV.of("foo", TimestampedValue.of(1, new Instant(1))),
-            KV.of("bar", TimestampedValue.of(2, new Instant(2))),
-            KV.of("baz", TimestampedValue.of(3, new Instant(3))));
-
-    pipeline.run();
-  }
-
-  @Test
-  @Category(ValidatesRunner.class)
-  public void extractFromValuesSucceeds() {
-    PCollection<KV<String, TimestampedValue<Integer>>> preified =
-        pipeline.apply(
-            Create.of(
-                KV.of("foo", TimestampedValue.of(0, new Instant((0)))),
-                KV.of("foo", TimestampedValue.of(1, new Instant(1))),
-                KV.of("bar", TimestampedValue.of(2, new Instant(2))),
-                KV.of("baz", TimestampedValue.of(3, new Instant(3)))));
-
-    PCollection<KV<String, Integer>> timestamped =
-        preified.apply(ReifyTimestamps.<String, Integer>extractFromValues());
-
-    PAssert.that(timestamped)
-        .containsInAnyOrder(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 2), KV.of("baz", 3));
-
-    timestamped.apply(
-        "AssertElementTimestamps",
-        ParDo.of(
-            new DoFn<KV<String, Integer>, Void>() {
-              @ProcessElement
-              public void verifyTimestampsEqualValue(ProcessContext context) {
-                assertThat(
-                    new Instant(context.element().getValue().longValue()),
-                    equalTo(context.timestamp()));
-              }
-            }));
-
-    pipeline.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
deleted file mode 100644
index 71bfdb5..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.collect.ImmutableList;
-import java.io.Serializable;
-import java.util.List;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.ValidatesRunner;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.Values;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link Reshuffle}.
- */
-@RunWith(JUnit4.class)
-public class ReshuffleTest implements Serializable {
-
-  private static final List<KV<String, Integer>> ARBITRARY_KVS = ImmutableList.of(
-        KV.of("k1", 3),
-        KV.of("k5", Integer.MAX_VALUE),
-        KV.of("k5", Integer.MIN_VALUE),
-        KV.of("k2", 66),
-        KV.of("k1", 4),
-        KV.of("k2", -33),
-        KV.of("k3", 0));
-
-  // TODO: test with more than one value per key
-  private static final List<KV<String, Integer>> GBK_TESTABLE_KVS = ImmutableList.of(
-        KV.of("k1", 3),
-        KV.of("k2", 4));
-
-  private static final List<KV<String, Iterable<Integer>>> GROUPED_TESTABLE_KVS = ImmutableList.of(
-        KV.of("k1", (Iterable<Integer>) ImmutableList.of(3)),
-        KV.of("k2", (Iterable<Integer>) ImmutableList.of(4)));
-
-  @Rule
-  public final transient TestPipeline pipeline = TestPipeline.create();
-
-  @Test
-  @Category(ValidatesRunner.class)
-  public void testJustReshuffle() {
-
-    PCollection<KV<String, Integer>> input = pipeline
-        .apply(Create.of(ARBITRARY_KVS)
-            .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())));
-
-    PCollection<KV<String, Integer>> output = input
-        .apply(Reshuffle.<String, Integer>of());
-
-    PAssert.that(output).containsInAnyOrder(ARBITRARY_KVS);
-
-    assertEquals(
-        input.getWindowingStrategy(),
-        output.getWindowingStrategy());
-
-    pipeline.run();
-  }
-
-  /**
-   * Tests that timestamps are preserved after applying a {@link Reshuffle} with the default
-   * {@link WindowingStrategy}.
-   */
-  @Test
-  @Category(ValidatesRunner.class)
-  public void testReshufflePreservesTimestamps() {
-    PCollection<KV<String, TimestampedValue<String>>> input =
-        pipeline
-            .apply(
-                Create.timestamped(
-                        TimestampedValue.of("foo", BoundedWindow.TIMESTAMP_MIN_VALUE),
-                        TimestampedValue.of("foo", new Instant(0)),
-                        TimestampedValue.of("bar", new Instant(33)),
-                        TimestampedValue.of("bar", GlobalWindow.INSTANCE.maxTimestamp()))
-                    .withCoder(StringUtf8Coder.of()))
-            .apply(
-                WithKeys.of(
-                    new SerializableFunction<String, String>() {
-                      @Override
-                      public String apply(String input) {
-                        return input;
-                      }
-                    }))
-            .apply("ReifyOriginalTimestamps", ReifyTimestamps.<String, String>inValues());
-
-    // The outer TimestampedValue is the reified timestamp post-reshuffle. The inner
-    // TimestampedValue is the pre-reshuffle timestamp.
-    PCollection<TimestampedValue<TimestampedValue<String>>> output =
-        input
-            .apply(Reshuffle.<String, TimestampedValue<String>>of())
-            .apply(
-                "ReifyReshuffledTimestamps",
-                ReifyTimestamps.<String, TimestampedValue<String>>inValues())
-            .apply(Values.<TimestampedValue<TimestampedValue<String>>>create());
-
-    PAssert.that(output)
-        .satisfies(
-            new SerializableFunction<Iterable<TimestampedValue<TimestampedValue<String>>>, Void>() {
-              @Override
-              public Void apply(Iterable<TimestampedValue<TimestampedValue<String>>> input) {
-                for (TimestampedValue<TimestampedValue<String>> elem : input) {
-                  Instant originalTimestamp = elem.getValue().getTimestamp();
-                  Instant afterReshuffleTimestamp = elem.getTimestamp();
-                  assertThat(
-                      "Reshuffle must preserve element timestamps",
-                      afterReshuffleTimestamp,
-                      equalTo(originalTimestamp));
-                }
-                return null;
-              }
-            });
-
-    pipeline.run();
-  }
-
-  @Test
-  @Category(ValidatesRunner.class)
-  public void testReshuffleAfterSessionsAndGroupByKey() {
-
-    PCollection<KV<String, Iterable<Integer>>> input = pipeline
-        .apply(Create.of(GBK_TESTABLE_KVS)
-            .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
-        .apply(Window.<KV<String, Integer>>into(
-            Sessions.withGapDuration(Duration.standardMinutes(10))))
-        .apply(GroupByKey.<String, Integer>create());
-
-    PCollection<KV<String, Iterable<Integer>>> output = input
-        .apply(Reshuffle.<String, Iterable<Integer>>of());
-
-    PAssert.that(output).containsInAnyOrder(GROUPED_TESTABLE_KVS);
-
-    assertEquals(
-        input.getWindowingStrategy(),
-        output.getWindowingStrategy());
-
-    pipeline.run();
-  }
-
-  @Test
-  @Category(ValidatesRunner.class)
-  public void testReshuffleAfterFixedWindowsAndGroupByKey() {
-
-    PCollection<KV<String, Iterable<Integer>>> input = pipeline
-        .apply(Create.of(GBK_TESTABLE_KVS)
-            .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
-        .apply(Window.<KV<String, Integer>>into(
-            FixedWindows.of(Duration.standardMinutes(10L))))
-        .apply(GroupByKey.<String, Integer>create());
-
-    PCollection<KV<String, Iterable<Integer>>> output = input
-        .apply(Reshuffle.<String, Iterable<Integer>>of());
-
-    PAssert.that(output).containsInAnyOrder(GROUPED_TESTABLE_KVS);
-
-    assertEquals(
-        input.getWindowingStrategy(),
-        output.getWindowingStrategy());
-
-    pipeline.run();
-  }
-
-  @Test
-  @Category(ValidatesRunner.class)
-  public void testReshuffleAfterSlidingWindowsAndGroupByKey() {
-
-    PCollection<KV<String, Iterable<Integer>>> input = pipeline
-        .apply(Create.of(GBK_TESTABLE_KVS)
-            .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
-        .apply(Window.<KV<String, Integer>>into(
-            FixedWindows.of(Duration.standardMinutes(10L))))
-        .apply(GroupByKey.<String, Integer>create());
-
-    PCollection<KV<String, Iterable<Integer>>> output = input
-        .apply(Reshuffle.<String, Iterable<Integer>>of());
-
-    PAssert.that(output).containsInAnyOrder(GROUPED_TESTABLE_KVS);
-
-    assertEquals(
-        input.getWindowingStrategy(),
-        output.getWindowingStrategy());
-
-    pipeline.run();
-  }
-
-  @Test
-  @Category(ValidatesRunner.class)
-  public void testReshuffleAfterFixedWindows() {
-
-    PCollection<KV<String, Integer>> input = pipeline
-        .apply(Create.of(ARBITRARY_KVS)
-            .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
-        .apply(Window.<KV<String, Integer>>into(
-            FixedWindows.of(Duration.standardMinutes(10L))));
-
-    PCollection<KV<String, Integer>> output = input
-        .apply(Reshuffle.<String, Integer>of());
-
-    PAssert.that(output).containsInAnyOrder(ARBITRARY_KVS);
-
-    assertEquals(
-        input.getWindowingStrategy(),
-        output.getWindowingStrategy());
-
-    pipeline.run();
-  }
-
-
-  @Test
-  @Category(ValidatesRunner.class)
-  public void testReshuffleAfterSlidingWindows() {
-
-    PCollection<KV<String, Integer>> input = pipeline
-        .apply(Create.of(ARBITRARY_KVS)
-            .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
-        .apply(Window.<KV<String, Integer>>into(
-            FixedWindows.of(Duration.standardMinutes(10L))));
-
-    PCollection<KV<String, Integer>> output = input
-        .apply(Reshuffle.<String, Integer>of());
-
-    PAssert.that(output).containsInAnyOrder(ARBITRARY_KVS);
-
-    assertEquals(
-        input.getWindowingStrategy(),
-        output.getWindowingStrategy());
-
-    pipeline.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java
deleted file mode 100644
index 63c71ed..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ReshuffleTrigger}.
- */
-@RunWith(JUnit4.class)
-public class ReshuffleTriggerTest {
-
-  /** Public so that other tests can instantiate {@link ReshuffleTrigger}. */
-  public static <W extends BoundedWindow> ReshuffleTrigger<W> forTest() {
-    return new ReshuffleTrigger<>();
-  }
-
-  @Test
-  public void testToString() {
-    Trigger trigger = new ReshuffleTrigger<>();
-    assertEquals("ReshuffleTrigger()", trigger.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterTest.java b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterTest.java
index 689dbff..e98bfa7 100644
--- a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterTest.java
+++ b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterTest.java
@@ -26,7 +26,6 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
-
 import org.apache.beam.sdk.extensions.sorter.SorterTestUtils.SorterGenerator;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/HashingFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/HashingFn.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/HashingFn.java
index d534c87..0a72517 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/HashingFn.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/HashingFn.java
@@ -17,14 +17,12 @@ package org.apache.beam.sdk.io.common;
 import com.google.common.collect.Lists;
 import com.google.common.hash.HashCode;
 import com.google.common.hash.Hashing;
-
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
-
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index f422135..ba64ab1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -40,12 +40,12 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 318ea89..09508e0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -27,7 +27,6 @@ import com.google.api.services.bigquery.model.TableSchema;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.hash.Hashing;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -35,7 +34,6 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.regex.Matcher;
 import javax.annotation.Nullable;
-
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.ResolveOptions;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
index 710c934..aee88e5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -33,7 +33,6 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CalculateSchemas.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CalculateSchemas.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CalculateSchemas.java
index db172dc..1ac216f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CalculateSchemas.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CalculateSchemas.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.transforms.Distinct;
 import org.apache.beam.sdk.transforms.DoFn;
-
 import org.apache.beam.sdk.transforms.Keys;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
index 210a072..3dc10b0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
@@ -29,7 +29,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.transforms.DoFn;

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
index 20b47e1..886236b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
@@ -22,10 +22,10 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
index 962e2cd..7a82c54 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
@@ -19,7 +19,6 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.api.services.bigquery.model.TableReference;
-
 import java.io.Serializable;
 import java.util.Objects;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
index 4136fa0..66004b2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
@@ -23,7 +23,6 @@ import com.google.common.collect.Maps;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
 import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
 import org.apache.beam.sdk.transforms.DoFn;

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index f575a3d..bf9d9f1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -28,7 +28,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
-
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index e7dba2a..83ff16b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -31,7 +31,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
-
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.MoveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;

http://git-wip-us.apache.org/repos/asf/beam/blob/e0b3f806/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
index 367aeb7..18ff688 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
@@ -24,13 +24,11 @@ import com.google.api.services.bigquery.model.JobConfigurationQuery;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.common.collect.Lists;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.List;
 import java.util.NoSuchElementException;
-
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.ListCoder;