You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/24 04:35:22 UTC

[39/50] [abbrv] beam git commit: Removes OldDoFn and its kin from runners-core

http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
deleted file mode 100644
index 761ffb8..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ /dev/null
@@ -1,744 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.base.Predicate;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Properties of {@link GroupAlsoByWindowsDoFn}.
- *
- * <p>Some properties may not hold of some implementations, due to restrictions on the context in
- * which the implementation is applicable. For example, some {@code GroupAlsoByWindows} may not
- * support merging windows.
- */
-public class GroupAlsoByWindowsProperties {
-
-  /**
-   * A factory of {@link GroupAlsoByWindowsDoFn} so that the various properties can provide the
-   * appropriate windowing strategy under test.
-   */
-  public interface GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> {
-    <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> forStrategy(
-        WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory);
-  }
-
-  /**
-   * Tests that for empty input and the given {@link WindowingStrategy}, the provided GABW
-   * implementation produces no output.
-   *
-   * <p>The input type is deliberately left as a wildcard, since it is not relevant.
-   */
-  public static <K, InputT, OutputT> void emptyInputEmptyOutput(
-      GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory) throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)));
-
-    // This key should never actually be used, though it is eagerly passed to the
-    // StateInternalsFactory so must be non-null
-    @SuppressWarnings("unchecked")
-    K fakeKey = (K) "this key should never be used";
-
-    List<WindowedValue<KV<K, OutputT>>> result =
-        runGABW(
-            gabwFactory,
-            windowingStrategy,
-            fakeKey,
-            Collections.<WindowedValue<InputT>>emptyList());
-
-    assertThat(result, hasSize(0));
-  }
-
-  /**
-   * Tests that for a simple sequence of elements on the same key, the given GABW implementation
-   * correctly groups them according to fixed windows.
-   */
-  public static void groupsElementsIntoFixedWindows(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-      throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)));
-
-    List<WindowedValue<KV<String, Iterable<String>>>> result =
-        runGABW(
-            gabwFactory,
-            windowingStrategy,
-            "key",
-            WindowedValue.of(
-                "v1", new Instant(1), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2", new Instant(2), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v3", new Instant(13), Arrays.asList(window(10, 20)), PaneInfo.NO_FIRING));
-
-    assertThat(result, hasSize(2));
-
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        getOnlyElementInWindow(result, window(0, 10));
-    assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp()));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        getOnlyElementInWindow(result, window(10, 20));
-    assertThat(item1.getValue().getValue(), contains("v3"));
-    assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp()));
-  }
-
-  /**
-   * Tests that for a simple sequence of elements on the same key, the given GABW implementation
-   * correctly groups them into sliding windows.
-   *
-   * <p>In the input here, each element occurs in multiple windows.
-   */
-  public static void groupsElementsIntoSlidingWindowsWithMinTimestamp(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-      throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST);
-
-    List<WindowedValue<KV<String, Iterable<String>>>> result =
-        runGABW(
-            gabwFactory,
-            windowingStrategy,
-            "key",
-            WindowedValue.of(
-                "v1",
-                new Instant(5),
-                Arrays.asList(window(-10, 10), window(0, 20)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2",
-                new Instant(15),
-                Arrays.asList(window(0, 20), window(10, 30)),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result, hasSize(3));
-
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        getOnlyElementInWindow(result, window(-10, 10));
-    assertThat(item0.getValue().getValue(), contains("v1"));
-    assertThat(item0.getTimestamp(), equalTo(new Instant(5)));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        getOnlyElementInWindow(result, window(0, 20));
-    assertThat(item1.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window
-    assertThat(item1.getTimestamp(), equalTo(new Instant(10)));
-
-    TimestampedValue<KV<String, Iterable<String>>> item2 =
-        getOnlyElementInWindow(result, window(10, 30));
-    assertThat(item2.getValue().getValue(), contains("v2"));
-    // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window
-    assertThat(item2.getTimestamp(), equalTo(new Instant(20)));
-  }
-
-  /**
-   * Tests that for a simple sequence of elements on the same key, the given GABW implementation
-   * correctly groups and combines them according to sliding windows.
-   *
-   * <p>In the input here, each element occurs in multiple windows.
-   */
-  public static void combinesElementsInSlidingWindows(
-      GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory,
-      CombineFn<Long, ?, Long> combineFn)
-      throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST);
-
-    List<WindowedValue<KV<String, Long>>> result =
-        runGABW(
-            gabwFactory,
-            windowingStrategy,
-            "k",
-            WindowedValue.of(
-                1L,
-                new Instant(5),
-                Arrays.asList(window(-10, 10), window(0, 20)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                2L,
-                new Instant(15),
-                Arrays.asList(window(0, 20), window(10, 30)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                4L,
-                new Instant(18),
-                Arrays.asList(window(0, 20), window(10, 30)),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result, hasSize(3));
-
-    TimestampedValue<KV<String, Long>> item0 = getOnlyElementInWindow(result, window(-10, 10));
-    assertThat(item0.getValue().getKey(), equalTo("k"));
-    assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L))));
-    assertThat(item0.getTimestamp(), equalTo(new Instant(5L)));
-
-    TimestampedValue<KV<String, Long>> item1 = getOnlyElementInWindow(result, window(0, 20));
-    assertThat(item1.getValue().getKey(), equalTo("k"));
-    assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L, 4L))));
-    // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window
-    assertThat(item1.getTimestamp(), equalTo(new Instant(10L)));
-
-    TimestampedValue<KV<String, Long>> item2 = getOnlyElementInWindow(result, window(10, 30));
-    assertThat(item2.getValue().getKey(), equalTo("k"));
-    assertThat(item2.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(2L, 4L))));
-    // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window
-    assertThat(item2.getTimestamp(), equalTo(new Instant(20L)));
-  }
-
-  /**
-   * Tests that the given GABW implementation correctly groups elements that fall into overlapping
-   * windows that are not merged.
-   */
-  public static void groupsIntoOverlappingNonmergingWindows(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-      throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)));
-
-    List<WindowedValue<KV<String, Iterable<String>>>> result =
-        runGABW(
-            gabwFactory,
-            windowingStrategy,
-            "key",
-            WindowedValue.of("v1", new Instant(1), Arrays.asList(window(0, 5)), PaneInfo.NO_FIRING),
-            WindowedValue.of("v2", new Instant(4), Arrays.asList(window(1, 5)), PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v3", new Instant(4), Arrays.asList(window(0, 5)), PaneInfo.NO_FIRING));
-
-    assertThat(result, hasSize(2));
-
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        getOnlyElementInWindow(result, window(0, 5));
-    assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v3"));
-    assertThat(item0.getTimestamp(), equalTo(window(1, 5).maxTimestamp()));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        getOnlyElementInWindow(result, window(1, 5));
-    assertThat(item1.getValue().getValue(), contains("v2"));
-    assertThat(item1.getTimestamp(), equalTo(window(0, 5).maxTimestamp()));
-  }
-
-  /** Tests that the given GABW implementation correctly groups elements into merged sessions. */
-  public static void groupsElementsInMergedSessions(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-      throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)));
-
-    List<WindowedValue<KV<String, Iterable<String>>>> result =
-        runGABW(
-            gabwFactory,
-            windowingStrategy,
-            "key",
-            WindowedValue.of(
-                "v1", new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2", new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v3", new Instant(15), Arrays.asList(window(15, 25)), PaneInfo.NO_FIRING));
-
-    assertThat(result, hasSize(2));
-
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        getOnlyElementInWindow(result, window(0, 15));
-    assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp()));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        getOnlyElementInWindow(result, window(15, 25));
-    assertThat(item1.getValue().getValue(), contains("v3"));
-    assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp()));
-  }
-
-  /**
-   * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per
-   * session window correctly according to the provided {@link CombineFn}.
-   */
-  public static void combinesElementsPerSession(
-      GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory,
-      CombineFn<Long, ?, Long> combineFn)
-      throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)));
-
-    List<WindowedValue<KV<String, Long>>> result =
-        runGABW(
-            gabwFactory,
-            windowingStrategy,
-            "k",
-            WindowedValue.of(1L, new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
-            WindowedValue.of(2L, new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                4L, new Instant(15), Arrays.asList(window(15, 25)), PaneInfo.NO_FIRING));
-
-    assertThat(result, hasSize(2));
-
-    TimestampedValue<KV<String, Long>> item0 = getOnlyElementInWindow(result, window(0, 15));
-    assertThat(item0.getValue().getKey(), equalTo("k"));
-    assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L))));
-    assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp()));
-
-    TimestampedValue<KV<String, Long>> item1 = getOnlyElementInWindow(result, window(15, 25));
-    assertThat(item1.getValue().getKey(), equalTo("k"));
-    assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(4L))));
-    assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp()));
-  }
-
-  /**
-   * Tests that for a simple sequence of elements on the same key, the given GABW implementation
-   * correctly groups them according to fixed windows and also sets the output timestamp according
-   * to the policy {@link TimestampCombiner#END_OF_WINDOW}.
-   */
-  public static void groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-      throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
-            .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW);
-
-    List<WindowedValue<KV<String, Iterable<String>>>> result =
-        runGABW(
-            gabwFactory,
-            windowingStrategy,
-            "key",
-            WindowedValue.of(
-                "v1", new Instant(1), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2", new Instant(2), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v3", new Instant(13), Arrays.asList(window(10, 20)), PaneInfo.NO_FIRING));
-
-    assertThat(result, hasSize(2));
-
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        getOnlyElementInWindow(result, window(0, 10));
-    assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp()));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        getOnlyElementInWindow(result, window(10, 20));
-    assertThat(item1.getValue().getValue(), contains("v3"));
-    assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp()));
-  }
-
-  /**
-   * Tests that for a simple sequence of elements on the same key, the given GABW implementation
-   * correctly groups them according to fixed windows and also sets the output timestamp according
-   * to the policy {@link TimestampCombiner#LATEST}.
-   */
-  public static void groupsElementsIntoFixedWindowsWithLatestTimestamp(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-      throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
-            .withTimestampCombiner(TimestampCombiner.LATEST);
-
-    List<WindowedValue<KV<String, Iterable<String>>>> result =
-        runGABW(
-            gabwFactory,
-            windowingStrategy,
-            "k",
-            WindowedValue.of(
-                "v1", new Instant(1), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2", new Instant(2), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v3", new Instant(13), Arrays.asList(window(10, 20)), PaneInfo.NO_FIRING));
-
-    assertThat(result, hasSize(2));
-
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        getOnlyElementInWindow(result, window(0, 10));
-    assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    assertThat(item0.getTimestamp(), equalTo(new Instant(2)));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        getOnlyElementInWindow(result, window(10, 20));
-    assertThat(item1.getValue().getValue(), contains("v3"));
-    assertThat(item1.getTimestamp(), equalTo(new Instant(13)));
-  }
-
-  /**
-   * Tests that the given GABW implementation correctly groups elements into merged sessions with
-   * output timestamps at the end of the merged window.
-   */
-  public static void groupsElementsInMergedSessionsWithEndOfWindowTimestamp(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-      throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
-            .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW);
-
-    List<WindowedValue<KV<String, Iterable<String>>>> result =
-        runGABW(
-            gabwFactory,
-            windowingStrategy,
-            "k",
-            WindowedValue.of(
-                "v1", new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2", new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v3", new Instant(15), Arrays.asList(window(15, 25)), PaneInfo.NO_FIRING));
-
-    assertThat(result, hasSize(2));
-
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        getOnlyElementInWindow(result, window(0, 15));
-    assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp()));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        getOnlyElementInWindow(result, window(15, 25));
-    assertThat(item1.getValue().getValue(), contains("v3"));
-    assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp()));
-  }
-
-  /**
-   * Tests that the given GABW implementation correctly groups elements into merged sessions with
-   * output timestamps at the end of the merged window.
-   */
-  public static void groupsElementsInMergedSessionsWithLatestTimestamp(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-      throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
-            .withTimestampCombiner(TimestampCombiner.LATEST);
-
-    BoundedWindow unmergedWindow = window(15, 25);
-    List<WindowedValue<KV<String, Iterable<String>>>> result =
-        runGABW(
-            gabwFactory,
-            windowingStrategy,
-            "k",
-            WindowedValue.of(
-                "v1", new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2", new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v3", new Instant(15), Arrays.asList(unmergedWindow), PaneInfo.NO_FIRING));
-
-    assertThat(result, hasSize(2));
-
-    BoundedWindow mergedWindow = window(0, 15);
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        getOnlyElementInWindow(result, mergedWindow);
-    assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    assertThat(item0.getTimestamp(), equalTo(new Instant(5)));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        getOnlyElementInWindow(result, unmergedWindow);
-    assertThat(item1.getValue().getValue(), contains("v3"));
-    assertThat(item1.getTimestamp(), equalTo(new Instant(15)));
-  }
-
-  /**
-   * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per
-   * session window correctly according to the provided {@link CombineFn}.
-   */
-  public static void combinesElementsPerSessionWithEndOfWindowTimestamp(
-      GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory,
-      CombineFn<Long, ?, Long> combineFn)
-      throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
-            .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW);
-
-    BoundedWindow secondWindow = window(15, 25);
-    List<WindowedValue<KV<String, Long>>> result =
-        runGABW(
-            gabwFactory,
-            windowingStrategy,
-            "k",
-            WindowedValue.of(1L, new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
-            WindowedValue.of(2L, new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING),
-            WindowedValue.of(4L, new Instant(15), Arrays.asList(secondWindow), PaneInfo.NO_FIRING));
-
-    assertThat(result, hasSize(2));
-
-    BoundedWindow firstResultWindow = window(0, 15);
-    TimestampedValue<KV<String, Long>> item0 = getOnlyElementInWindow(result, firstResultWindow);
-    assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L))));
-    assertThat(item0.getTimestamp(), equalTo(firstResultWindow.maxTimestamp()));
-
-    TimestampedValue<KV<String, Long>> item1 = getOnlyElementInWindow(result, secondWindow);
-    assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(4L))));
-    assertThat(item1.getTimestamp(), equalTo(secondWindow.maxTimestamp()));
-  }
-
-  @SafeVarargs
-  private static <K, InputT, OutputT, W extends BoundedWindow>
-      List<WindowedValue<KV<K, OutputT>>> runGABW(
-          GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory,
-          WindowingStrategy<?, W> windowingStrategy,
-          K key,
-          WindowedValue<InputT>... values)
-          throws Exception {
-    return runGABW(gabwFactory, windowingStrategy, key, Arrays.asList(values));
-  }
-
-  private static <K, InputT, OutputT, W extends BoundedWindow>
-      List<WindowedValue<KV<K, OutputT>>> runGABW(
-          GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory,
-          WindowingStrategy<?, W> windowingStrategy,
-          K key,
-          Collection<WindowedValue<InputT>> values)
-          throws Exception {
-
-    final StateInternalsFactory<K> stateInternalsCache = new CachingStateInternalsFactory<K>();
-
-    List<WindowedValue<KV<K, OutputT>>> output =
-        processElement(
-            gabwFactory.forStrategy(windowingStrategy, stateInternalsCache),
-            KV.<K, Iterable<WindowedValue<InputT>>>of(key, values));
-
-    // Sanity check for corruption
-    for (WindowedValue<KV<K, OutputT>> value : output) {
-      assertThat(value.getValue().getKey(), equalTo(key));
-    }
-
-    return output;
-  }
-
-  private static BoundedWindow window(long start, long end) {
-    return new IntervalWindow(new Instant(start), new Instant(end));
-  }
-
-  private static final class CachingStateInternalsFactory<K> implements StateInternalsFactory<K> {
-    private final LoadingCache<K, StateInternals> stateInternalsCache;
-
-    private CachingStateInternalsFactory() {
-      this.stateInternalsCache = CacheBuilder.newBuilder().build(new StateInternalsLoader<K>());
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public StateInternals stateInternalsForKey(K key) {
-      try {
-        return stateInternalsCache.get(key);
-      } catch (Exception exc) {
-        throw new RuntimeException(exc);
-      }
-    }
-  }
-
-  private static class StateInternalsLoader<K> extends CacheLoader<K, StateInternals> {
-    @Override
-    public StateInternals load(K key) throws Exception {
-      return InMemoryStateInternals.forKey(key);
-    }
-  }
-
-  private static <K, InputT, OutputT, W extends BoundedWindow>
-      List<WindowedValue<KV<K, OutputT>>> processElement(
-          GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> fn,
-          KV<K, Iterable<WindowedValue<InputT>>> element)
-          throws Exception {
-    TestProcessContext<K, InputT, OutputT, W> c = new TestProcessContext<>(fn, element);
-    fn.processElement(c);
-    return c.getOutput();
-  }
-
-  private static <K, OutputT> TimestampedValue<KV<K, OutputT>> getOnlyElementInWindow(
-      List<WindowedValue<KV<K, OutputT>>> output, final BoundedWindow window) {
-    WindowedValue<KV<K, OutputT>> res =
-        Iterables.getOnlyElement(
-            Iterables.filter(
-                output,
-                new Predicate<WindowedValue<KV<K, OutputT>>>() {
-                  @Override
-                  public boolean apply(@Nullable WindowedValue<KV<K, OutputT>> input) {
-                    return input.getWindows().contains(window);
-                  }
-                }));
-    return TimestampedValue.of(res.getValue(), res.getTimestamp());
-  }
-
-  /**
-   * A {@link GroupAlsoByWindowsDoFn.ProcessContext} providing just enough context for a {@link
-   * GroupAlsoByWindowsDoFn} - namely, information about the element and output via {@link
-   * WindowingInternals}, but no side inputs/outputs and no normal output.
-   */
-  private static class TestProcessContext<K, InputT, OutputT, W extends BoundedWindow>
-      extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W>.ProcessContext {
-    private final PipelineOptions options = PipelineOptionsFactory.create();
-    private final KV<K, Iterable<WindowedValue<InputT>>> element;
-    private final List<WindowedValue<KV<K, OutputT>>> output = new ArrayList<>();
-
-    private TestProcessContext(
-        GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> fn,
-        KV<K, Iterable<WindowedValue<InputT>>> element) {
-      fn.super();
-      this.element = element;
-    }
-
-    @Override
-    public KV<K, Iterable<WindowedValue<InputT>>> element() {
-      return element;
-    }
-
-    @Override
-    public Instant timestamp() {
-      return BoundedWindow.TIMESTAMP_MIN_VALUE;
-    }
-
-    @Override
-    public BoundedWindow window() {
-      return GlobalWindow.INSTANCE;
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return PaneInfo.NO_FIRING;
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public WindowingInternals<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>
-        windowingInternals() {
-      return new WindowingInternals<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>() {
-        @Override
-        public void outputWindowedValue(
-            KV<K, OutputT> output,
-            Instant timestamp,
-            Collection<? extends BoundedWindow> windows,
-            PaneInfo pane) {
-          TestProcessContext.this.output.add(WindowedValue.of(output, timestamp, windows, pane));
-        }
-
-        @Override
-        public <AdditionalOutputT> void outputWindowedValue(
-            TupleTag<AdditionalOutputT> tag,
-            AdditionalOutputT output,
-            Instant timestamp,
-            Collection<? extends BoundedWindow> windows,
-            PaneInfo pane) {
-          throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public StateInternals stateInternals() {
-          throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public TimerInternals timerInternals() {
-          throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public Collection<? extends BoundedWindow> windows() {
-          return ImmutableList.of(GlobalWindow.INSTANCE);
-        }
-
-        @Override
-        public PaneInfo pane() {
-          return PaneInfo.NO_FIRING;
-        }
-
-        @Override
-        public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
-          throw new UnsupportedOperationException();
-        }
-      };
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return options;
-    }
-
-    @Override
-    public void output(KV<K, OutputT> output) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void outputWithTimestamp(KV<K, OutputT> output, Instant timestamp) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public <T> void output(TupleTag<T> tag, T output) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      throw new UnsupportedOperationException();
-    }
-
-    public List<WindowedValue<KV<K, OutputT>>> getOutput() {
-      return output;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
deleted file mode 100644
index 581c3e0..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-
-/**
- * A {@link OldDoFn} that does nothing with provided elements. Used for testing
- * methods provided by the {@link OldDoFn} abstract class.
- *
- * @param <InputT> unused.
- * @param <OutputT> unused.
- */
-class NoOpOldDoFn<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
-  @Override
-  public void processElement(OldDoFn<InputT, OutputT>.ProcessContext c) throws Exception {
-  }
-
-  /**
-   * Returns a new NoOp Context.
-   */
-  public OldDoFn<InputT, OutputT>.Context context() {
-    return new NoOpDoFnContext();
-  }
-
-  /**
-   * A {@link OldDoFn.Context} that does nothing and returns exclusively null.
-   */
-  private class NoOpDoFnContext extends OldDoFn<InputT, OutputT>.Context {
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return null;
-    }
-    @Override
-    public void output(OutputT output) {
-    }
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-    }
-    @Override
-    public <T> void output(TupleTag<T> tag, T output) {
-    }
-    @Override
-    public <T> void outputWithTimestamp(TupleTag<T> tag, T output,
-        Instant timestamp) {
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
deleted file mode 100644
index f608a81..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static org.hamcrest.Matchers.empty;
-import static org.junit.Assert.assertThat;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for OldDoFn.
- */
-@RunWith(JUnit4.class)
-public class OldDoFnTest implements Serializable {
-
-  @Rule
-  public transient ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void testPopulateDisplayDataDefaultBehavior() {
-    OldDoFn<String, String> usesDefault =
-        new OldDoFn<String, String>() {
-          @Override
-          public void processElement(ProcessContext c) throws Exception {}
-        };
-
-    DisplayData data = DisplayData.from(usesDefault);
-    assertThat(data.items(), empty());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
deleted file mode 100644
index a73ef5e..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static org.hamcrest.Matchers.is;
-import static org.mockito.Mockito.mock;
-
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link SimpleOldDoFnRunner} functionality.
- */
-@RunWith(JUnit4.class)
-public class SimpleOldDoFnRunnerTest {
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void testExceptionsWrappedAsUserCodeException() {
-    ThrowingDoFn fn = new ThrowingDoFn();
-    DoFnRunner<String, String> runner = createRunner(fn);
-
-    thrown.expect(UserCodeException.class);
-    thrown.expectCause(is(fn.exceptionToThrow));
-
-    runner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
-  }
-
-  @Test
-  public void testSystemDoFnInternalExceptionsNotWrapped() {
-    ThrowingSystemDoFn fn = new ThrowingSystemDoFn();
-    DoFnRunner<String, String> runner = createRunner(fn);
-
-    thrown.expect(is(fn.exceptionToThrow));
-
-    runner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
-  }
-
-  private DoFnRunner<String, String> createRunner(OldDoFn<String, String> fn) {
-    // Pass in only necessary parameters for the test
-    List<TupleTag<?>> additionalOutputTags = Arrays.asList();
-    StepContext context = mock(StepContext.class);
-    return new SimpleOldDoFnRunner<>(
-        null, fn, null, null, null, additionalOutputTags, context, null);
-  }
-
-  static class ThrowingDoFn extends OldDoFn<String, String> {
-    final Exception exceptionToThrow =
-        new UnsupportedOperationException("Expected exception");
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      throw exceptionToThrow;
-    }
-  }
-
-  @SystemDoFnInternal
-  static class ThrowingSystemDoFn extends ThrowingDoFn {
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 1d079d9..84be15d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.Collection;
-import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
+import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
@@ -143,9 +143,9 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
           application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder());
       reduceFn = SystemReduceFn.buffering(valueCoder);
       droppedDueToClosedWindow = Metrics.counter(GroupAlsoByWindowEvaluator.class,
-          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
+          GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
       droppedDueToLateness = Metrics.counter(GroupAlsoByWindowEvaluator.class,
-          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER);
+          GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 1d843f9..815b6ba 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -23,7 +23,8 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
+import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators;
+import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
 import org.apache.beam.runners.core.LateDataUtils;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.ReduceFnRunner;
@@ -76,7 +77,7 @@ import scala.reflect.ClassTag;
 import scala.runtime.AbstractFunction1;
 
 /**
- * An implementation of {@link org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn}
+ * An implementation of {@link GroupAlsoByWindow}
  * logic for grouping by windows and controlling trigger firings and pane accumulation.
  *
  * <p>This implementation is a composite of Spark transformations revolving around state management
@@ -210,10 +211,10 @@ public class SparkGroupAlsoByWindowViaWindowSet {
         final MetricsContainerImpl cellProvider = new MetricsContainerImpl("cellProvider");
         final CounterCell droppedDueToClosedWindow = cellProvider.getCounter(
             MetricName.named(SparkGroupAlsoByWindowViaWindowSet.class,
-            GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER));
+            GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER));
         final CounterCell droppedDueToLateness = cellProvider.getCounter(
             MetricName.named(SparkGroupAlsoByWindowViaWindowSet.class,
-                GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER));
+                GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER));
 
         AbstractIterator<
             Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>>

http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
index 18a3dd8..3d76a61 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Iterables;
 import java.util.Collection;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window.Assign;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.spark.api.java.function.Function;
@@ -29,7 +30,8 @@ import org.joda.time.Instant;
 
 
 /**
- * An implementation of {@link org.apache.beam.runners.core.AssignWindowsDoFn} for the Spark runner.
+ * An implementation of {@link Assign} for the Spark
+ * runner.
  */
 public class SparkAssignWindowFn<T, W extends BoundedWindow>
     implements Function<WindowedValue<T>, WindowedValue<T>> {

http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
index a70885b..be02335 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -21,7 +21,7 @@ package org.apache.beam.runners.spark.translation;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaOutputBufferDoFn;
+import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
 import org.apache.beam.runners.core.InMemoryTimerInternals;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.ReduceFnRunner;
@@ -45,7 +45,7 @@ import org.apache.spark.api.java.function.FlatMapFunction;
 import org.joda.time.Instant;
 
 /**
- * An implementation of {@link GroupAlsoByWindowViaOutputBufferDoFn}
+ * An implementation of {@link GroupAlsoByWindow}
  * for the Spark runner.
  */
 public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends BoundedWindow>