You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/22 02:22:19 UTC

[05/12] incubator-beam git commit: Move some easy stuff into runners/core-java

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
deleted file mode 100644
index 65fc52d..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
-import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-/**
- * An implementation of {@link GroupByKey} built on top of a lower-level {@link GroupByKeyOnly}
- * primitive.
- *
- * <p>This implementation of {@link GroupByKey} proceeds via the following steps:
- * <ol>
- *   <li>{@code ReifyTimestampsAndWindowsDoFn ParDo(ReifyTimestampsAndWindows)}: This embeds
- *       the previously-implicit timestamp and window into the elements themselves, so a
- *       window-and-timestamp-unaware transform can operate on them.</li>
- *   <li>{@code GroupByKeyOnly}: This lower-level primitive groups by keys, ignoring windows
- *       and timestamps. Many window-unaware runners have such a primitive already.</li>
- *   <li>{@code SortValuesByTimestamp ParDo(SortValuesByTimestamp)}: The values in the iterables
- *       output by {@link GroupByKeyOnly} are sorted by timestamp.</li>
- *   <li>{@code GroupAlsoByWindow}: This primitive processes the sorted values. Today it is
- *       implemented as a {@link ParDo} that calls reserved internal methods.</li>
- * </ol>
- *
- * <p>This implementation of {@link GroupByKey} has severe limitations unless its component
- * transforms are replaced. As-is, it is only applicable for in-memory runners using a batch-style
- * execution strategy. Specifically:
- *
- * <ul>
- *   <li>Every iterable output by {@link GroupByKeyOnly} must contain all elements for that key.
- *       A streaming-style partition, with multiple elements for the same key, will not yield
- *       correct results.</li>
- *   <li>Sorting of values by timestamp is performed on an in-memory list. It will not succeed
- *       for large iterables.</li>
- *   <li>The implementation of {@code GroupAlsoByWindow} does not support timers. This is only
- *       appropriate for runners which also do not support timers.</li>
- * </ul>
- */
-public class GroupByKeyViaGroupByKeyOnly<K, V>
-    extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
-
-  private GroupByKey<K, V> gbkTransform;
-
-  public GroupByKeyViaGroupByKeyOnly(GroupByKey<K, V> originalTransform) {
-    this.gbkTransform = originalTransform;
-  }
-
-  @Override
-  public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
-    WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-
-    return input
-        // Make each input element's timestamp and assigned windows
-        // explicit, in the value part.
-        .apply(new ReifyTimestampsAndWindows<K, V>())
-
-        // Group by just the key.
-        // Combiner lifting will not happen regardless of the disallowCombinerLifting value.
-        // There will be no combiners right after the GroupByKeyOnly because of the two ParDos
-        // introduced in here.
-        .apply(new GroupByKeyOnly<K, WindowedValue<V>>())
-
-        // Sort each key's values by timestamp. GroupAlsoByWindow requires
-        // its input to be sorted by timestamp.
-        .apply(new SortValuesByTimestamp<K, V>())
-
-        // Group each key's values by window, merging windows as needed.
-        .apply(new GroupAlsoByWindow<K, V>(windowingStrategy))
-
-        // And update the windowing strategy as appropriate.
-        .setWindowingStrategyInternal(
-            gbkTransform.updateWindowingStrategy(windowingStrategy));
-  }
-
-  /**
-   * Runner-specific primitive that groups by key only, ignoring any window assignments. A
-   * runner that uses {@link GroupByKeyViaGroupByKeyOnly} should have a primitive way to translate
-   * or evaluate this class.
-   */
-  public static class GroupByKeyOnly<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    @Override
-    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
-      return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
-          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
-    }
-
-    @Override
-    public Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
-      return GroupByKey.getOutputKvCoder(input.getCoder());
-    }
-  }
-
-  /**
-   * Helper transform that makes timestamps and window assignments explicit in the value part of
-   * each key/value pair.
-   */
-  public static class ReifyTimestampsAndWindows<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, WindowedValue<V>>>> {
-
-    @Override
-    public PCollection<KV<K, WindowedValue<V>>> apply(PCollection<KV<K, V>> input) {
-
-      // The requirement to use a KvCoder *is* actually a model-level requirement, not specific
-      // to this implementation of GBK. All runners need a way to get the key.
-      checkArgument(
-          input.getCoder() instanceof KvCoder,
-          "%s requires its input to use a %s",
-          GroupByKey.class.getSimpleName(),
-          KvCoder.class.getSimpleName());
-
-      @SuppressWarnings("unchecked")
-      KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder();
-      Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-      Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
-      Coder<WindowedValue<V>> outputValueCoder =
-          FullWindowedValueCoder.of(
-              inputValueCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
-      Coder<KV<K, WindowedValue<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
-      return input
-          .apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>()))
-          .setCoder(outputKvCoder);
-    }
-  }
-
-  /**
-   * Helper transform that sorts the values associated with each key by timestamp.
-   */
-  private static class SortValuesByTimestamp<K, V>
-      extends PTransform<
-          PCollection<KV<K, Iterable<WindowedValue<V>>>>,
-          PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
-    @Override
-    public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(
-        PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
-      return input
-          .apply(
-              ParDo.of(
-                  new DoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<WindowedValue<V>>>>() {
-                    @Override
-                    public void processElement(ProcessContext c) {
-                      KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
-                      K key = kvs.getKey();
-                      Iterable<WindowedValue<V>> unsortedValues = kvs.getValue();
-                      List<WindowedValue<V>> sortedValues = new ArrayList<>();
-                      for (WindowedValue<V> value : unsortedValues) {
-                        sortedValues.add(value);
-                      }
-                      Collections.sort(
-                          sortedValues,
-                          new Comparator<WindowedValue<V>>() {
-                            @Override
-                            public int compare(WindowedValue<V> e1, WindowedValue<V> e2) {
-                              return e1.getTimestamp().compareTo(e2.getTimestamp());
-                            }
-                          });
-                      c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues));
-                    }
-                  }))
-          .setCoder(input.getCoder());
-    }
-  }
-
-  /**
-   * Helper transform that takes a collection of timestamp-ordered
-   * values associated with each key, groups the values by window,
-   * combines windows as needed, and for each window in each key,
-   * outputs a collection of key/value-list pairs implicitly assigned
-   * to the window and with the timestamp derived from that window.
-   */
-  private static class GroupAlsoByWindow<K, V>
-      extends PTransform<
-          PCollection<KV<K, Iterable<WindowedValue<V>>>>, PCollection<KV<K, Iterable<V>>>> {
-    private final WindowingStrategy<?, ?> windowingStrategy;
-
-    public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
-      this.windowingStrategy = windowingStrategy;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public PCollection<KV<K, Iterable<V>>> apply(
-        PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
-      @SuppressWarnings("unchecked")
-      KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
-          (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder();
-
-      Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-      Coder<Iterable<WindowedValue<V>>> inputValueCoder = inputKvCoder.getValueCoder();
-
-      IterableCoder<WindowedValue<V>> inputIterableValueCoder =
-          (IterableCoder<WindowedValue<V>>) inputValueCoder;
-      Coder<WindowedValue<V>> inputIterableElementCoder = inputIterableValueCoder.getElemCoder();
-      WindowedValueCoder<V> inputIterableWindowedValueCoder =
-          (WindowedValueCoder<V>) inputIterableElementCoder;
-
-      Coder<V> inputIterableElementValueCoder = inputIterableWindowedValueCoder.getValueCoder();
-      Coder<Iterable<V>> outputValueCoder = IterableCoder.of(inputIterableElementValueCoder);
-      Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
-
-      return input
-          .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, inputIterableElementValueCoder)))
-          .setCoder(outputKvCoder);
-    }
-
-    private <W extends BoundedWindow>
-        GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W> groupAlsoByWindowsFn(
-            WindowingStrategy<?, W> strategy, Coder<V> inputIterableElementValueCoder) {
-      return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
-          strategy, SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java
deleted file mode 100644
index 4815162..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.KV;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-
-import org.joda.time.Instant;
-
-/**
- * A customized {@link DoFnRunner} that handles late data dropping for
- * a {@link KeyedWorkItem} input {@link DoFn}.
- *
- * <p>It expands windows before checking data lateness.
- *
- * <p>{@link KeyedWorkItem KeyedWorkItems} are always in empty windows.
- *
- * @param <K> key type
- * @param <InputT> input value element type
- * @param <OutputT> output value element type
- * @param <W> window type
- */
-public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWindow>
-    implements DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> {
-  private final DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner;
-  private final LateDataFilter lateDataFilter;
-
-  public LateDataDroppingDoFnRunner(
-      DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner,
-      WindowingStrategy<?, ?> windowingStrategy,
-      TimerInternals timerInternals,
-      Aggregator<Long, Long> droppedDueToLateness) {
-    this.doFnRunner = doFnRunner;
-    lateDataFilter = new LateDataFilter(windowingStrategy, timerInternals, droppedDueToLateness);
-  }
-
-  @Override
-  public void startBundle() {
-    doFnRunner.startBundle();
-  }
-
-  @Override
-  public void processElement(WindowedValue<KeyedWorkItem<K, InputT>> elem) {
-    Iterable<WindowedValue<InputT>> nonLateElements = lateDataFilter.filter(
-        elem.getValue().key(), elem.getValue().elementsIterable());
-    KeyedWorkItem<K, InputT> keyedWorkItem = KeyedWorkItems.workItem(
-        elem.getValue().key(), elem.getValue().timersIterable(), nonLateElements);
-    doFnRunner.processElement(elem.withValue(keyedWorkItem));
-  }
-
-  @Override
-  public void finishBundle() {
-    doFnRunner.finishBundle();
-  }
-
-  /**
-   * It filters late data in a {@link KeyedWorkItem}.
-   */
-  @VisibleForTesting
-  static class LateDataFilter {
-    private final WindowingStrategy<?, ?> windowingStrategy;
-    private final TimerInternals timerInternals;
-    private final Aggregator<Long, Long> droppedDueToLateness;
-
-    public LateDataFilter(
-        WindowingStrategy<?, ?> windowingStrategy,
-        TimerInternals timerInternals,
-        Aggregator<Long, Long> droppedDueToLateness) {
-      this.windowingStrategy = windowingStrategy;
-      this.timerInternals = timerInternals;
-      this.droppedDueToLateness = droppedDueToLateness;
-    }
-
-    /**
-     * Returns an {@code Iterable<WindowedValue<InputT>>} that only contains
-     * non-late input elements.
-     */
-    public <K, InputT> Iterable<WindowedValue<InputT>> filter(
-        final K key, Iterable<WindowedValue<InputT>> elements) {
-      Iterable<Iterable<WindowedValue<InputT>>> windowsExpandedElements = Iterables.transform(
-          elements,
-          new Function<WindowedValue<InputT>, Iterable<WindowedValue<InputT>>>() {
-            @Override
-            public Iterable<WindowedValue<InputT>> apply(final WindowedValue<InputT> input) {
-              return Iterables.transform(
-                  input.getWindows(),
-                  new Function<BoundedWindow, WindowedValue<InputT>>() {
-                    @Override
-                    public WindowedValue<InputT> apply(BoundedWindow window) {
-                      return WindowedValue.of(
-                          input.getValue(), input.getTimestamp(), window, input.getPane());
-                    }
-                  });
-            }});
-
-      Iterable<WindowedValue<InputT>> nonLateElements = Iterables.filter(
-          Iterables.concat(windowsExpandedElements),
-          new Predicate<WindowedValue<InputT>>() {
-            @Override
-            public boolean apply(WindowedValue<InputT> input) {
-              BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
-              if (canDropDueToExpiredWindow(window)) {
-                // The element is too late for this window.
-                droppedDueToLateness.addValue(1L);
-                WindowTracing.debug(
-                    "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
-                    + "since too far behind inputWatermark:{}; outputWatermark:{}",
-                    input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
-                    timerInternals.currentOutputWatermarkTime());
-                return false;
-              } else {
-                return true;
-              }
-            }
-          });
-      return nonLateElements;
-    }
-
-    /** Is {@code window} expired w.r.t. the garbage collection watermark? */
-    private boolean canDropDueToExpiredWindow(BoundedWindow window) {
-      Instant inputWM = timerInternals.currentInputWatermarkTime();
-      return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonEmptyPanes.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonEmptyPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonEmptyPanes.java
deleted file mode 100644
index e809c24..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonEmptyPanes.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateMerging;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-
-/**
- * Tracks which windows have non-empty panes. Specifically, which windows have new elements since
- * their last triggering.
- *
- * @param <W> The kind of windows being tracked.
- */
-public abstract class NonEmptyPanes<K, W extends BoundedWindow> {
-
-  static <K, W extends BoundedWindow> NonEmptyPanes<K, W> create(
-      WindowingStrategy<?, W> strategy, ReduceFn<K, ?, ?, W> reduceFn) {
-    if (strategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES) {
-      return new DiscardingModeNonEmptyPanes<>(reduceFn);
-    } else {
-      return new GeneralNonEmptyPanes<>();
-    }
-  }
-
-  /**
-   * Record that some content has been added to the window in {@code context}, and therefore the
-   * current pane is not empty.
-   */
-  public abstract void recordContent(StateAccessor<K> context);
-
-  /**
-   * Record that the given pane is empty.
-   */
-  public abstract void clearPane(StateAccessor<K> state);
-
-  /**
-   * Return true if the current pane for the window in {@code context} is empty.
-   */
-  public abstract ReadableState<Boolean> isEmpty(StateAccessor<K> context);
-
-  /**
-   * Prefetch in preparation for merging.
-   */
-  public abstract void prefetchOnMerge(MergingStateAccessor<K, W> state);
-
-  /**
-   * Eagerly merge backing state.
-   */
-  public abstract void onMerge(MergingStateAccessor<K, W> context);
-
-  /**
-   * An implementation of {@code NonEmptyPanes} optimized for use with discarding mode. Uses the
-   * presence of data in the accumulation buffer to record non-empty panes.
-   */
-  private static class DiscardingModeNonEmptyPanes<K, W extends BoundedWindow>
-      extends NonEmptyPanes<K, W> {
-
-    private ReduceFn<K, ?, ?, W> reduceFn;
-
-    private DiscardingModeNonEmptyPanes(ReduceFn<K, ?, ?, W> reduceFn) {
-      this.reduceFn = reduceFn;
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
-      return reduceFn.isEmpty(state);
-    }
-
-    @Override
-    public void recordContent(StateAccessor<K> state) {
-      // Nothing to do -- the reduceFn is tracking contents
-    }
-
-    @Override
-    public void clearPane(StateAccessor<K> state) {
-      // Nothing to do -- the reduceFn is tracking contents
-    }
-
-    @Override
-    public void prefetchOnMerge(MergingStateAccessor<K, W> state) {
-      // Nothing to do -- the reduceFn is tracking contents
-    }
-
-    @Override
-    public void onMerge(MergingStateAccessor<K, W> context) {
-      // Nothing to do -- the reduceFn is tracking contents
-    }
-  }
-
-  /**
-   * An implementation of {@code NonEmptyPanes} for general use.
-   */
-  private static class GeneralNonEmptyPanes<K, W extends BoundedWindow>
-      extends NonEmptyPanes<K, W> {
-
-    private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
-        PANE_ADDITIONS_TAG =
-        StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
-            "count", VarLongCoder.of(), new Sum.SumLongFn()));
-
-    @Override
-    public void recordContent(StateAccessor<K> state) {
-      state.access(PANE_ADDITIONS_TAG).add(1L);
-    }
-
-    @Override
-    public void clearPane(StateAccessor<K> state) {
-      state.access(PANE_ADDITIONS_TAG).clear();
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
-      return state.access(PANE_ADDITIONS_TAG).isEmpty();
-    }
-
-    @Override
-    public void prefetchOnMerge(MergingStateAccessor<K, W> state) {
-      StateMerging.prefetchCombiningValues(state, PANE_ADDITIONS_TAG);
-    }
-
-    @Override
-    public void onMerge(MergingStateAccessor<K, W> context) {
-      StateMerging.mergeCombiningValues(context, PANE_ADDITIONS_TAG);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
deleted file mode 100644
index 5e08031..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.util.state.ValueState;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import org.joda.time.Instant;
-
-/**
- * Determine the timing and other properties of a new pane for a given computation, key and window.
- * Incorporates any previous pane, whether the pane has been produced because an
- * on-time {@link AfterWatermark} trigger firing, and the relation between the element's timestamp
- * and the current output watermark.
- */
-public class PaneInfoTracker {
-  private TimerInternals timerInternals;
-
-  public PaneInfoTracker(TimerInternals timerInternals) {
-    this.timerInternals = timerInternals;
-  }
-
-  @VisibleForTesting
-  static final StateTag<Object, ValueState<PaneInfo>> PANE_INFO_TAG =
-      StateTags.makeSystemTagInternal(StateTags.value("pane", PaneInfoCoder.INSTANCE));
-
-  public void clear(StateAccessor<?> state) {
-    state.access(PANE_INFO_TAG).clear();
-  }
-
-  /**
-   * Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane
-   * info includes the timing for the pane, who's calculation is quite subtle.
-   *
-   * @param isFinal should be {@code true} only if the triggering machinery can guarantee
-   * no further firings for the
-   */
-  public ReadableState<PaneInfo> getNextPaneInfo(
-      ReduceFn<?, ?, ?, ?>.Context context, final boolean isFinal) {
-    final Object key = context.key();
-    final ReadableState<PaneInfo> previousPaneFuture =
-        context.state().access(PaneInfoTracker.PANE_INFO_TAG);
-    final Instant windowMaxTimestamp = context.window().maxTimestamp();
-
-    return new ReadableState<PaneInfo>() {
-      @Override
-      public ReadableState<PaneInfo> readLater() {
-        previousPaneFuture.readLater();
-        return this;
-      }
-
-      @Override
-      public PaneInfo read() {
-        PaneInfo previousPane = previousPaneFuture.read();
-        return describePane(key, windowMaxTimestamp, previousPane, isFinal);
-      }
-    };
-  }
-
-  public void storeCurrentPaneInfo(ReduceFn<?, ?, ?, ?>.Context context, PaneInfo currentPane) {
-    context.state().access(PANE_INFO_TAG).write(currentPane);
-  }
-
-  private <W> PaneInfo describePane(
-      Object key, Instant windowMaxTimestamp, PaneInfo previousPane, boolean isFinal) {
-    boolean isFirst = previousPane == null;
-    Timing previousTiming = isFirst ? null : previousPane.getTiming();
-    long index = isFirst ? 0 : previousPane.getIndex() + 1;
-    long nonSpeculativeIndex = isFirst ? 0 : previousPane.getNonSpeculativeIndex() + 1;
-    Instant outputWM = timerInternals.currentOutputWatermarkTime();
-    Instant inputWM = timerInternals.currentInputWatermarkTime();
-
-    // True if it is not possible to assign the element representing this pane a timestamp
-    // which will make an ON_TIME pane for any following computation.
-    // Ie true if the element's latest possible timestamp is before the current output watermark.
-    boolean isLateForOutput = outputWM != null && windowMaxTimestamp.isBefore(outputWM);
-
-    // True if all emitted panes (if any) were EARLY panes.
-    // Once the ON_TIME pane has fired, all following panes must be considered LATE even
-    // if the output watermark is behind the end of the window.
-    boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY;
-
-    // True is the input watermark hasn't passed the window's max timestamp.
-    boolean isEarlyForInput = !inputWM.isAfter(windowMaxTimestamp);
-
-    Timing timing;
-    if (isLateForOutput || !onlyEarlyPanesSoFar) {
-      // The output watermark has already passed the end of this window, or we have already
-      // emitted a non-EARLY pane. Irrespective of how this pane was triggered we must
-      // consider this pane LATE.
-      timing = Timing.LATE;
-    } else if (isEarlyForInput) {
-      // This is an EARLY firing.
-      timing = Timing.EARLY;
-      nonSpeculativeIndex = -1;
-    } else {
-      // This is the unique ON_TIME firing for the window.
-      timing = Timing.ON_TIME;
-    }
-
-    WindowTracing.debug(
-        "describePane: {} pane (prev was {}) for key:{}; windowMaxTimestamp:{}; "
-        + "inputWatermark:{}; outputWatermark:{}; isLateForOutput:{}",
-        timing, previousTiming, key, windowMaxTimestamp, inputWM, outputWM, isLateForOutput);
-
-    if (previousPane != null) {
-      // Timing transitions should follow EARLY* ON_TIME? LATE*
-      switch (previousTiming) {
-        case EARLY:
-          Preconditions.checkState(
-              timing == Timing.EARLY || timing == Timing.ON_TIME || timing == Timing.LATE,
-              "EARLY cannot transition to %s", timing);
-          break;
-        case ON_TIME:
-          Preconditions.checkState(
-              timing == Timing.LATE, "ON_TIME cannot transition to %s", timing);
-          break;
-        case LATE:
-          Preconditions.checkState(timing == Timing.LATE, "LATE cannot transtion to %s", timing);
-          break;
-        case UNKNOWN:
-          break;
-      }
-      Preconditions.checkState(!previousPane.isLast(), "Last pane was not last after all.");
-    }
-
-    return PaneInfo.createPane(isFirst, isFinal, timing, index, nonSpeculativeIndex);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
deleted file mode 100644
index b1442dd..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.PCollectionView;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning
- * them via the {@link #processElementInReadyWindows(WindowedValue)}.
- */
-public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
-  private final DoFnRunner<InputT, OutputT> underlying;
-  private final Collection<PCollectionView<?>> views;
-  private final ReadyCheckingSideInputReader sideInputReader;
-
-  private Set<BoundedWindow> notReadyWindows;
-
-  public static <InputT, OutputT> PushbackSideInputDoFnRunner<InputT, OutputT> create(
-      DoFnRunner<InputT, OutputT> underlying,
-      Collection<PCollectionView<?>> views,
-      ReadyCheckingSideInputReader sideInputReader) {
-    return new PushbackSideInputDoFnRunner<>(underlying, views, sideInputReader);
-  }
-
-  private PushbackSideInputDoFnRunner(
-      DoFnRunner<InputT, OutputT> underlying,
-      Collection<PCollectionView<?>> views,
-      ReadyCheckingSideInputReader sideInputReader) {
-    this.underlying = underlying;
-    this.views = views;
-    this.sideInputReader = sideInputReader;
-  }
-
-  @Override
-  public void startBundle() {
-    notReadyWindows = new HashSet<>();
-    underlying.startBundle();
-  }
-
-  /**
-   * Call the underlying {@link DoFnRunner#processElement(WindowedValue)} for the provided element
-   * for each window the element is in that is ready.
-   *
-   * @param elem the element to process in all ready windows
-   * @return each element that could not be processed because it requires a side input window
-   * that is not ready.
-   */
-  public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
-    if (views.isEmpty()) {
-      processElement(elem);
-      return Collections.emptyList();
-    }
-    ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder();
-    for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
-      BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows());
-      boolean isReady = !notReadyWindows.contains(mainInputWindow);
-      for (PCollectionView<?> view : views) {
-        BoundedWindow sideInputWindow =
-            view.getWindowingStrategyInternal()
-                .getWindowFn()
-                .getSideInputWindow(mainInputWindow);
-        if (!sideInputReader.isReady(view, sideInputWindow)) {
-          isReady = false;
-          break;
-        }
-      }
-      if (isReady) {
-        processElement(windowElem);
-      } else {
-        notReadyWindows.add(mainInputWindow);
-        pushedBack.add(windowElem);
-      }
-    }
-    return pushedBack.build();
-  }
-
-  @Override
-  public void processElement(WindowedValue<InputT> elem) {
-    underlying.processElement(elem);
-  }
-
-  /**
-   * Call the underlying {@link DoFnRunner#finishBundle()}.
-   */
-  @Override
-  public void finishBundle() {
-    notReadyWindows = null;
-    underlying.finishBundle();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFn.java
deleted file mode 100644
index c5ee1e1..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFn.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateAccessor;
-
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-
-/**
- * Specification for processing to happen after elements have been grouped by key.
- *
- * @param <K> The type of key being processed.
- * @param <InputT> The type of input values associated with the key.
- * @param <OutputT> The output type that will be produced for each key.
- * @param <W> The type of windows this operates on.
- */
-public abstract class ReduceFn<K, InputT, OutputT, W extends BoundedWindow>
-    implements Serializable {
-
-  /** Information accessible to all the processing methods in this {@code ReduceFn}. */
-  public abstract class Context {
-    /** Return the key that is being processed. */
-    public abstract K key();
-
-    /** The window that is being processed. */
-    public abstract W window();
-
-    /** Access the current {@link WindowingStrategy}. */
-    public abstract WindowingStrategy<?, W> windowingStrategy();
-
-    /** Return the interface for accessing state. */
-    public abstract StateAccessor<K> state();
-
-    /** Return the interface for accessing timers. */
-    public abstract Timers timers();
-  }
-
-  /** Information accessible within {@link #processValue}. */
-  public abstract class ProcessValueContext extends Context {
-    /** Return the actual value being processed. */
-    public abstract InputT value();
-
-    /** Return the timestamp associated with the value. */
-    public abstract Instant timestamp();
-  }
-
-  /** Information accessible within {@link #onMerge}. */
-  public abstract class OnMergeContext extends Context {
-    /** Return the interface for accessing state. */
-    @Override
-    public abstract MergingStateAccessor<K, W> state();
-  }
-
-  /** Information accessible within {@link #onTrigger}. */
-  public abstract class OnTriggerContext extends Context {
-    /** Returns the {@link PaneInfo} for the trigger firing being processed. */
-    public abstract PaneInfo paneInfo();
-
-    /** Output the given value in the current window. */
-    public abstract void output(OutputT value);
-  }
-
-  //////////////////////////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Called for each value of type {@code InputT} associated with the current key.
-   */
-  public abstract void processValue(ProcessValueContext c) throws Exception;
-
-  /**
-   * Called when windows are merged.
-   */
-  public abstract void onMerge(OnMergeContext context) throws Exception;
-
-  /**
-   * Called when triggers fire.
-   *
-   * <p>Implementations of {@link ReduceFn} should call {@link OnTriggerContext#output} to emit
-   * any results that should be included in the pane produced by this trigger firing.
-   */
-  public abstract void onTrigger(OnTriggerContext context) throws Exception;
-
-  /**
-   * Called before {@link #onMerge} is invoked to provide an opportunity to prefetch any needed
-   * state.
-   *
-   * @param c Context to use prefetch from.
-   */
-  public void prefetchOnMerge(MergingStateAccessor<K, W> c) throws Exception {}
-
-  /**
-   * Called before {@link #onTrigger} is invoked to provide an opportunity to prefetch any needed
-   * state.
-   *
-   * @param context Context to use prefetch from.
-   */
-  public void prefetchOnTrigger(StateAccessor<K> context) {}
-
-  /**
-   * Called to clear any persisted state that the {@link ReduceFn} may be holding. This will be
-   * called when the windowing is closing and will receive no future interactions.
-   */
-  public abstract void clearState(Context context) throws Exception;
-
-  /**
-   * Returns true if the there is no buffered state.
-   */
-  public abstract ReadableState<Boolean> isEmpty(StateAccessor<K> context);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java
deleted file mode 100644
index c90940e..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java
+++ /dev/null
@@ -1,497 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateContext;
-import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
-import org.apache.beam.sdk.util.state.StateTag;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.Instant;
-
-import java.util.Collection;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * Factory for creating instances of the various {@link ReduceFn} contexts.
- */
-class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
-  public interface OnTriggerCallbacks<OutputT> {
-    void output(OutputT toOutput);
-  }
-
-  private final K key;
-  private final ReduceFn<K, InputT, OutputT, W> reduceFn;
-  private final WindowingStrategy<?, W> windowingStrategy;
-  private final StateInternals<K> stateInternals;
-  private final ActiveWindowSet<W> activeWindows;
-  private final TimerInternals timerInternals;
-  private final WindowingInternals<?, ?> windowingInternals;
-  private final PipelineOptions options;
-
-  ReduceFnContextFactory(K key, ReduceFn<K, InputT, OutputT, W> reduceFn,
-      WindowingStrategy<?, W> windowingStrategy, StateInternals<K> stateInternals,
-      ActiveWindowSet<W> activeWindows, TimerInternals timerInternals,
-      WindowingInternals<?, ?> windowingInternals, PipelineOptions options) {
-    this.key = key;
-    this.reduceFn = reduceFn;
-    this.windowingStrategy = windowingStrategy;
-    this.stateInternals = stateInternals;
-    this.activeWindows = activeWindows;
-    this.timerInternals = timerInternals;
-    this.windowingInternals = windowingInternals;
-    this.options = options;
-  }
-
-  /** Where should we look for state associated with a given window? */
-  public static enum StateStyle {
-    /** All state is associated with the window itself. */
-    DIRECT,
-    /** State is associated with the 'state address' windows tracked by the active window set. */
-    RENAMED
-  }
-
-  private StateAccessorImpl<K, W> stateAccessor(W window, StateStyle style) {
-    return new StateAccessorImpl<K, W>(
-        activeWindows, windowingStrategy.getWindowFn().windowCoder(),
-        stateInternals, StateContexts.createFromComponents(options, windowingInternals, window),
-        style);
-  }
-
-  public ReduceFn<K, InputT, OutputT, W>.Context base(W window, StateStyle style) {
-    return new ContextImpl(stateAccessor(window, style));
-  }
-
-  public ReduceFn<K, InputT, OutputT, W>.ProcessValueContext forValue(
-      W window, InputT value, Instant timestamp, StateStyle style) {
-    return new ProcessValueContextImpl(stateAccessor(window, style), value, timestamp);
-  }
-
-  public ReduceFn<K, InputT, OutputT, W>.OnTriggerContext forTrigger(W window,
-      ReadableState<PaneInfo> pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
-    return new OnTriggerContextImpl(stateAccessor(window, style), pane, callbacks);
-  }
-
-  public ReduceFn<K, InputT, OutputT, W>.OnMergeContext forMerge(
-      Collection<W> activeToBeMerged, W mergeResult, StateStyle style) {
-    return new OnMergeContextImpl(
-        new MergingStateAccessorImpl<K, W>(activeWindows,
-            windowingStrategy.getWindowFn().windowCoder(),
-            stateInternals, style, activeToBeMerged, mergeResult));
-  }
-
-  public ReduceFn<K, InputT, OutputT, W>.OnMergeContext forPremerge(W window) {
-    return new OnPremergeContextImpl(new PremergingStateAccessorImpl<K, W>(
-        activeWindows, windowingStrategy.getWindowFn().windowCoder(), stateInternals, window));
-  }
-
-  private class TimersImpl implements Timers {
-    private final StateNamespace namespace;
-
-    public TimersImpl(StateNamespace namespace) {
-      Preconditions.checkArgument(namespace instanceof WindowNamespace);
-      this.namespace = namespace;
-    }
-
-    @Override
-    public void setTimer(Instant timestamp, TimeDomain timeDomain) {
-      timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain));
-    }
-
-    @Override
-    public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
-      timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain));
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      return timerInternals.currentProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentSynchronizedProcessingTime() {
-      return timerInternals.currentSynchronizedProcessingTime();
-    }
-
-    @Override
-    public Instant currentEventTime() {
-      return timerInternals.currentInputWatermarkTime();
-    }
-  }
-
-  // ======================================================================
-  // StateAccessors
-  // ======================================================================
-  static class StateAccessorImpl<K, W extends BoundedWindow> implements StateAccessor<K> {
-
-
-    protected final ActiveWindowSet<W> activeWindows;
-    protected final StateContext<W> context;
-    protected final StateNamespace windowNamespace;
-    protected final Coder<W> windowCoder;
-    protected final StateInternals<K> stateInternals;
-    protected final StateStyle style;
-
-    public StateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
-        StateInternals<K> stateInternals, StateContext<W> context, StateStyle style) {
-
-      this.activeWindows = activeWindows;
-      this.windowCoder = windowCoder;
-      this.stateInternals = stateInternals;
-      this.context = checkNotNull(context);
-      this.windowNamespace = namespaceFor(context.window());
-      this.style = style;
-    }
-
-    protected StateNamespace namespaceFor(W window) {
-      return StateNamespaces.window(windowCoder, window);
-    }
-
-    protected StateNamespace windowNamespace() {
-      return windowNamespace;
-    }
-
-    W window() {
-      return context.window();
-    }
-
-    StateNamespace namespace() {
-      return windowNamespace();
-    }
-
-    @Override
-    public <StateT extends State> StateT access(StateTag<? super K, StateT> address) {
-      switch (style) {
-        case DIRECT:
-          return stateInternals.state(windowNamespace(), address, context);
-        case RENAMED:
-          return stateInternals.state(
-              namespaceFor(activeWindows.writeStateAddress(context.window())), address, context);
-      }
-      throw new RuntimeException(); // cases are exhaustive.
-    }
-  }
-
-  static class MergingStateAccessorImpl<K, W extends BoundedWindow>
-      extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> {
-    private final Collection<W> activeToBeMerged;
-
-    public MergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
-        StateInternals<K> stateInternals, StateStyle style, Collection<W> activeToBeMerged,
-        W mergeResult) {
-      super(activeWindows, windowCoder, stateInternals,
-          StateContexts.windowOnly(mergeResult), style);
-      this.activeToBeMerged = activeToBeMerged;
-    }
-
-    @Override
-    public <StateT extends State> StateT access(StateTag<? super K, StateT> address) {
-      switch (style) {
-        case DIRECT:
-          return stateInternals.state(windowNamespace(), address, context);
-        case RENAMED:
-          return stateInternals.state(
-              namespaceFor(activeWindows.mergedWriteStateAddress(
-                  activeToBeMerged, context.window())),
-              address,
-              context);
-      }
-      throw new RuntimeException(); // cases are exhaustive.
-    }
-
-    @Override
-    public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
-        StateTag<? super K, StateT> address) {
-      ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
-      for (W mergingWindow : activeToBeMerged) {
-        StateNamespace namespace = null;
-        switch (style) {
-          case DIRECT:
-            namespace = namespaceFor(mergingWindow);
-            break;
-          case RENAMED:
-            namespace = namespaceFor(activeWindows.writeStateAddress(mergingWindow));
-            break;
-        }
-        Preconditions.checkNotNull(namespace); // cases are exhaustive.
-        builder.put(mergingWindow, stateInternals.state(namespace, address, context));
-      }
-      return builder.build();
-    }
-  }
-
-  static class PremergingStateAccessorImpl<K, W extends BoundedWindow>
-      extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> {
-    public PremergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
-        StateInternals<K> stateInternals, W window) {
-      super(activeWindows, windowCoder, stateInternals,
-          StateContexts.windowOnly(window), StateStyle.RENAMED);
-    }
-
-    Collection<W> mergingWindows() {
-      return activeWindows.readStateAddresses(context.window());
-    }
-
-    @Override
-    public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
-        StateTag<? super K, StateT> address) {
-      ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
-      for (W stateAddressWindow : activeWindows.readStateAddresses(context.window())) {
-        StateT stateForWindow =
-            stateInternals.state(namespaceFor(stateAddressWindow), address, context);
-        builder.put(stateAddressWindow, stateForWindow);
-      }
-      return builder.build();
-    }
-  }
-
-  // ======================================================================
-  // Contexts
-  // ======================================================================
-
-  private class ContextImpl extends ReduceFn<K, InputT, OutputT, W>.Context {
-    private final StateAccessorImpl<K, W> state;
-    private final TimersImpl timers;
-
-    private ContextImpl(StateAccessorImpl<K, W> state) {
-      reduceFn.super();
-      this.state = state;
-      this.timers = new TimersImpl(state.namespace());
-    }
-
-    @Override
-    public K key() {
-      return key;
-    }
-
-    @Override
-    public W window() {
-      return state.window();
-    }
-
-    @Override
-    public WindowingStrategy<?, W> windowingStrategy() {
-      return windowingStrategy;
-    }
-
-    @Override
-    public StateAccessor<K> state() {
-      return state;
-    }
-
-    @Override
-    public Timers timers() {
-      return timers;
-    }
-  }
-
-  private class ProcessValueContextImpl
-      extends ReduceFn<K, InputT, OutputT, W>.ProcessValueContext {
-    private final InputT value;
-    private final Instant timestamp;
-    private final StateAccessorImpl<K, W> state;
-    private final TimersImpl timers;
-
-    private ProcessValueContextImpl(StateAccessorImpl<K, W> state,
-        InputT value, Instant timestamp) {
-      reduceFn.super();
-      this.state = state;
-      this.value = value;
-      this.timestamp = timestamp;
-      this.timers = new TimersImpl(state.namespace());
-    }
-
-    @Override
-    public K key() {
-      return key;
-    }
-
-    @Override
-    public W window() {
-      return state.window();
-    }
-
-    @Override
-    public WindowingStrategy<?, W> windowingStrategy() {
-      return windowingStrategy;
-    }
-
-    @Override
-    public StateAccessor<K> state() {
-      return state;
-    }
-
-    @Override
-    public InputT value() {
-      return value;
-    }
-
-    @Override
-    public Instant timestamp() {
-      return timestamp;
-    }
-
-    @Override
-    public Timers timers() {
-      return timers;
-    }
-  }
-
-  private class OnTriggerContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnTriggerContext {
-    private final StateAccessorImpl<K, W> state;
-    private final ReadableState<PaneInfo> pane;
-    private final OnTriggerCallbacks<OutputT> callbacks;
-    private final TimersImpl timers;
-
-    private OnTriggerContextImpl(StateAccessorImpl<K, W> state, ReadableState<PaneInfo> pane,
-        OnTriggerCallbacks<OutputT> callbacks) {
-      reduceFn.super();
-      this.state = state;
-      this.pane = pane;
-      this.callbacks = callbacks;
-      this.timers = new TimersImpl(state.namespace());
-    }
-
-    @Override
-    public K key() {
-      return key;
-    }
-
-    @Override
-    public W window() {
-      return state.window();
-    }
-
-    @Override
-    public WindowingStrategy<?, W> windowingStrategy() {
-      return windowingStrategy;
-    }
-
-    @Override
-    public StateAccessor<K> state() {
-      return state;
-    }
-
-    @Override
-    public PaneInfo paneInfo() {
-      return pane.read();
-    }
-
-    @Override
-    public void output(OutputT value) {
-      callbacks.output(value);
-    }
-
-    @Override
-    public Timers timers() {
-      return timers;
-    }
-  }
-
-  private class OnMergeContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnMergeContext {
-    private final MergingStateAccessorImpl<K, W> state;
-    private final TimersImpl timers;
-
-    private OnMergeContextImpl(MergingStateAccessorImpl<K, W> state) {
-      reduceFn.super();
-      this.state = state;
-      this.timers = new TimersImpl(state.namespace());
-    }
-
-    @Override
-    public K key() {
-      return key;
-    }
-
-    @Override
-    public WindowingStrategy<?, W> windowingStrategy() {
-      return windowingStrategy;
-    }
-
-    @Override
-    public MergingStateAccessor<K, W> state() {
-      return state;
-    }
-
-    @Override
-    public W window() {
-      return state.window();
-    }
-
-    @Override
-    public Timers timers() {
-      return timers;
-    }
-  }
-
-  private class OnPremergeContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnMergeContext {
-    private final PremergingStateAccessorImpl<K, W> state;
-    private final TimersImpl timers;
-
-    private OnPremergeContextImpl(PremergingStateAccessorImpl<K, W> state) {
-      reduceFn.super();
-      this.state = state;
-      this.timers = new TimersImpl(state.namespace());
-    }
-
-    @Override
-    public K key() {
-      return key;
-    }
-
-    @Override
-    public WindowingStrategy<?, W> windowingStrategy() {
-      return windowingStrategy;
-    }
-
-    @Override
-    public MergingStateAccessor<K, W> state() {
-      return state;
-    }
-
-    @Override
-    public W window() {
-      return state.window();
-    }
-
-    @Override
-    public Timers timers() {
-      return timers;
-    }
-  }
-}