You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:34 UTC

[10/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java
deleted file mode 100644
index 8b2d56f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.dataflow.model.DataflowPackage;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
-import com.google.common.hash.Funnels;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import com.google.common.io.CountingOutputStream;
-import com.google.common.io.Files;
-
-import com.fasterxml.jackson.core.Base64Variants;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Objects;
-
-/** Helper routines for packages. */
-public class PackageUtil {
-  private static final Logger LOG = LoggerFactory.getLogger(PackageUtil.class);
-  /**
-   * A reasonable upper bound on the number of jars required to launch a Dataflow job.
-   */
-  public static final int SANE_CLASSPATH_SIZE = 1000;
-  /**
-   * The initial interval to use between package staging attempts.
-   */
-  private static final long INITIAL_BACKOFF_INTERVAL_MS = 5000L;
-  /**
-   * The maximum number of attempts when staging a file.
-   */
-  private static final int MAX_ATTEMPTS = 5;
-
-  /**
-   * Translates exceptions from API calls.
-   */
-  private static final ApiErrorExtractor ERROR_EXTRACTOR = new ApiErrorExtractor();
-
-  /**
-   * Creates a DataflowPackage containing information about how a classpath element should be
-   * staged, including the staging destination as well as its size and hash.
-   *
-   * @param classpathElement The local path for the classpath element.
-   * @param stagingPath The base location for staged classpath elements.
-   * @param overridePackageName If non-null, use the given value as the package name
-   *                            instead of generating one automatically.
-   * @return The package.
-   */
-  @Deprecated
-  public static DataflowPackage createPackage(File classpathElement,
-      String stagingPath, String overridePackageName) {
-    return createPackageAttributes(classpathElement, stagingPath, overridePackageName)
-        .getDataflowPackage();
-  }
-
-  /**
-   * Compute and cache the attributes of a classpath element that we will need to stage it.
-   *
-   * @param classpathElement the file or directory to be staged.
-   * @param stagingPath The base location for staged classpath elements.
-   * @param overridePackageName If non-null, use the given value as the package name
-   *                            instead of generating one automatically.
-   * @return a {@link PackageAttributes} that containing metadata about the object to be staged.
-   */
-  static PackageAttributes createPackageAttributes(File classpathElement,
-      String stagingPath, String overridePackageName) {
-    try {
-      boolean directory = classpathElement.isDirectory();
-
-      // Compute size and hash in one pass over file or directory.
-      Hasher hasher = Hashing.md5().newHasher();
-      OutputStream hashStream = Funnels.asOutputStream(hasher);
-      CountingOutputStream countingOutputStream = new CountingOutputStream(hashStream);
-
-      if (!directory) {
-        // Files are staged as-is.
-        Files.asByteSource(classpathElement).copyTo(countingOutputStream);
-      } else {
-        // Directories are recursively zipped.
-        ZipFiles.zipDirectory(classpathElement, countingOutputStream);
-      }
-
-      long size = countingOutputStream.getCount();
-      String hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
-
-      // Create the DataflowPackage with staging name and location.
-      String uniqueName = getUniqueContentName(classpathElement, hash);
-      String resourcePath = IOChannelUtils.resolve(stagingPath, uniqueName);
-      DataflowPackage target = new DataflowPackage();
-      target.setName(overridePackageName != null ? overridePackageName : uniqueName);
-      target.setLocation(resourcePath);
-
-      return new PackageAttributes(size, hash, directory, target);
-    } catch (IOException e) {
-      throw new RuntimeException("Package setup failure for " + classpathElement, e);
-    }
-  }
-
-  /**
-   * Transfers the classpath elements to the staging location.
-   *
-   * @param classpathElements The elements to stage.
-   * @param stagingPath The base location to stage the elements to.
-   * @return A list of cloud workflow packages, each representing a classpath element.
-   */
-  public static List<DataflowPackage> stageClasspathElements(
-      Collection<String> classpathElements, String stagingPath) {
-    return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT);
-  }
-
-  // Visible for testing.
-  static List<DataflowPackage> stageClasspathElements(
-      Collection<String> classpathElements, String stagingPath,
-      Sleeper retrySleeper) {
-    LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to "
-        + "prepare for execution.", classpathElements.size());
-
-    if (classpathElements.size() > SANE_CLASSPATH_SIZE) {
-      LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically "
-          + "copies to all workers. Having this many entries on your classpath may be indicative "
-          + "of an issue in your pipeline. You may want to consider trimming the classpath to "
-          + "necessary dependencies only, using --filesToStage pipeline option to override "
-          + "what files are being staged, or bundling several dependencies into one.",
-          classpathElements.size());
-    }
-
-    ArrayList<DataflowPackage> packages = new ArrayList<>();
-
-    if (stagingPath == null) {
-      throw new IllegalArgumentException(
-          "Can't stage classpath elements on because no staging location has been provided");
-    }
-
-    int numUploaded = 0;
-    int numCached = 0;
-    for (String classpathElement : classpathElements) {
-      String packageName = null;
-      if (classpathElement.contains("=")) {
-        String[] components = classpathElement.split("=", 2);
-        packageName = components[0];
-        classpathElement = components[1];
-      }
-
-      File file = new File(classpathElement);
-      if (!file.exists()) {
-        LOG.warn("Skipping non-existent classpath element {} that was specified.",
-            classpathElement);
-        continue;
-      }
-
-      PackageAttributes attributes = createPackageAttributes(file, stagingPath, packageName);
-
-      DataflowPackage workflowPackage = attributes.getDataflowPackage();
-      packages.add(workflowPackage);
-      String target = workflowPackage.getLocation();
-
-      // TODO: Should we attempt to detect the Mime type rather than
-      // always using MimeTypes.BINARY?
-      try {
-        try {
-          long remoteLength = IOChannelUtils.getSizeBytes(target);
-          if (remoteLength == attributes.getSize()) {
-            LOG.debug("Skipping classpath element already staged: {} at {}",
-                classpathElement, target);
-            numCached++;
-            continue;
-          }
-        } catch (FileNotFoundException expected) {
-          // If the file doesn't exist, it means we need to upload it.
-        }
-
-        // Upload file, retrying on failure.
-        AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
-            MAX_ATTEMPTS,
-            INITIAL_BACKOFF_INTERVAL_MS);
-        while (true) {
-          try {
-            LOG.debug("Uploading classpath element {} to {}", classpathElement, target);
-            try (WritableByteChannel writer = IOChannelUtils.create(target, MimeTypes.BINARY)) {
-              copyContent(classpathElement, writer);
-            }
-            numUploaded++;
-            break;
-          } catch (IOException e) {
-            if (ERROR_EXTRACTOR.accessDenied(e)) {
-              String errorMessage = String.format(
-                  "Uploaded failed due to permissions error, will NOT retry staging "
-                  + "of classpath %s. Please verify credentials are valid and that you have "
-                  + "write access to %s. Stale credentials can be resolved by executing "
-                  + "'gcloud auth login'.", classpathElement, target);
-              LOG.error(errorMessage);
-              throw new IOException(errorMessage, e);
-            } else if (!backoff.atMaxAttempts()) {
-              LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}",
-                  classpathElement, e);
-              BackOffUtils.next(retrySleeper, backoff);
-            } else {
-              // Rethrow last error, to be included as a cause in the catch below.
-              LOG.error("Upload failed, will NOT retry staging of classpath: {}",
-                  classpathElement, e);
-              throw e;
-            }
-          }
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Could not stage classpath element: " + classpathElement, e);
-      }
-    }
-
-    LOG.info("Uploading PipelineOptions.filesToStage complete: {} files newly uploaded, "
-        + "{} files cached",
-        numUploaded, numCached);
-
-    return packages;
-  }
-
-  /**
-   * Returns a unique name for a file with a given content hash.
-   *
-   * <p>Directory paths are removed. Example:
-   * <pre>
-   * dir="a/b/c/d", contentHash="f000" => d-f000.jar
-   * file="a/b/c/d.txt", contentHash="f000" => d-f000.txt
-   * file="a/b/c/d", contentHash="f000" => d-f000
-   * </pre>
-   */
-  static String getUniqueContentName(File classpathElement, String contentHash) {
-    String fileName = Files.getNameWithoutExtension(classpathElement.getAbsolutePath());
-    String fileExtension = Files.getFileExtension(classpathElement.getAbsolutePath());
-    if (classpathElement.isDirectory()) {
-      return fileName + "-" + contentHash + ".jar";
-    } else if (fileExtension.isEmpty()) {
-      return fileName + "-" + contentHash;
-    }
-    return fileName + "-" + contentHash + "." + fileExtension;
-  }
-
-  /**
-   * Copies the contents of the classpathElement to the output channel.
-   *
-   * <p>If the classpathElement is a directory, a Zip stream is constructed on the fly,
-   * otherwise the file contents are copied as-is.
-   *
-   * <p>The output channel is not closed.
-   */
-  private static void copyContent(String classpathElement, WritableByteChannel outputChannel)
-      throws IOException {
-    final File classpathElementFile = new File(classpathElement);
-    if (classpathElementFile.isDirectory()) {
-      ZipFiles.zipDirectory(classpathElementFile, Channels.newOutputStream(outputChannel));
-    } else {
-      Files.asByteSource(classpathElementFile).copyTo(Channels.newOutputStream(outputChannel));
-    }
-  }
-  /**
-   * Holds the metadata necessary to stage a file or confirm that a staged file has not changed.
-   */
-  static class PackageAttributes {
-    private final boolean directory;
-    private final long size;
-    private final String hash;
-    private DataflowPackage dataflowPackage;
-
-    public PackageAttributes(long size, String hash, boolean directory,
-        DataflowPackage dataflowPackage) {
-      this.size = size;
-      this.hash = Objects.requireNonNull(hash, "hash");
-      this.directory = directory;
-      this.dataflowPackage = Objects.requireNonNull(dataflowPackage, "dataflowPackage");
-    }
-
-    /**
-     * @return the dataflowPackage
-     */
-    public DataflowPackage getDataflowPackage() {
-      return dataflowPackage;
-    }
-
-    /**
-     * @return the directory
-     */
-    public boolean isDirectory() {
-      return directory;
-    }
-
-    /**
-     * @return the size
-     */
-    public long getSize() {
-      return size;
-    }
-
-    /**
-     * @return the hash
-     */
-    public String getHash() {
-      return hash;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
deleted file mode 100644
index a7818a3..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo.Timing;
-import com.google.cloud.dataflow.sdk.util.state.ReadableState;
-import com.google.cloud.dataflow.sdk.util.state.StateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.StateTag;
-import com.google.cloud.dataflow.sdk.util.state.StateTags;
-import com.google.cloud.dataflow.sdk.util.state.ValueState;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import org.joda.time.Instant;
-
-/**
- * Determine the timing and other properties of a new pane for a given computation, key and window.
- * Incorporates any previous pane, whether the pane has been produced because an
- * on-time {@link AfterWatermark} trigger firing, and the relation between the element's timestamp
- * and the current output watermark.
- */
-public class PaneInfoTracker {
-  private TimerInternals timerInternals;
-
-  public PaneInfoTracker(TimerInternals timerInternals) {
-    this.timerInternals = timerInternals;
-  }
-
-  @VisibleForTesting
-  static final StateTag<Object, ValueState<PaneInfo>> PANE_INFO_TAG =
-      StateTags.makeSystemTagInternal(StateTags.value("pane", PaneInfoCoder.INSTANCE));
-
-  public void clear(StateAccessor<?> state) {
-    state.access(PANE_INFO_TAG).clear();
-  }
-
-  /**
-   * Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane
-   * info includes the timing for the pane, who's calculation is quite subtle.
-   *
-   * @param isFinal should be {@code true} only if the triggering machinery can guarantee
-   * no further firings for the
-   */
-  public ReadableState<PaneInfo> getNextPaneInfo(
-      ReduceFn<?, ?, ?, ?>.Context context, final boolean isFinal) {
-    final Object key = context.key();
-    final ReadableState<PaneInfo> previousPaneFuture =
-        context.state().access(PaneInfoTracker.PANE_INFO_TAG);
-    final Instant windowMaxTimestamp = context.window().maxTimestamp();
-
-    return new ReadableState<PaneInfo>() {
-      @Override
-      public ReadableState<PaneInfo> readLater() {
-        previousPaneFuture.readLater();
-        return this;
-      }
-
-      @Override
-      public PaneInfo read() {
-        PaneInfo previousPane = previousPaneFuture.read();
-        return describePane(key, windowMaxTimestamp, previousPane, isFinal);
-      }
-    };
-  }
-
-  public void storeCurrentPaneInfo(ReduceFn<?, ?, ?, ?>.Context context, PaneInfo currentPane) {
-    context.state().access(PANE_INFO_TAG).write(currentPane);
-  }
-
-  private <W> PaneInfo describePane(
-      Object key, Instant windowMaxTimestamp, PaneInfo previousPane, boolean isFinal) {
-    boolean isFirst = previousPane == null;
-    Timing previousTiming = isFirst ? null : previousPane.getTiming();
-    long index = isFirst ? 0 : previousPane.getIndex() + 1;
-    long nonSpeculativeIndex = isFirst ? 0 : previousPane.getNonSpeculativeIndex() + 1;
-    Instant outputWM = timerInternals.currentOutputWatermarkTime();
-    Instant inputWM = timerInternals.currentInputWatermarkTime();
-
-    // True if it is not possible to assign the element representing this pane a timestamp
-    // which will make an ON_TIME pane for any following computation.
-    // Ie true if the element's latest possible timestamp is before the current output watermark.
-    boolean isLateForOutput = outputWM != null && windowMaxTimestamp.isBefore(outputWM);
-
-    // True if all emitted panes (if any) were EARLY panes.
-    // Once the ON_TIME pane has fired, all following panes must be considered LATE even
-    // if the output watermark is behind the end of the window.
-    boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY;
-
-    // True is the input watermark hasn't passed the window's max timestamp.
-    boolean isEarlyForInput = inputWM == null || !inputWM.isAfter(windowMaxTimestamp);
-
-    Timing timing;
-    if (isLateForOutput || !onlyEarlyPanesSoFar) {
-      // The output watermark has already passed the end of this window, or we have already
-      // emitted a non-EARLY pane. Irrespective of how this pane was triggered we must
-      // consider this pane LATE.
-      timing = Timing.LATE;
-    } else if (isEarlyForInput) {
-      // This is an EARLY firing.
-      timing = Timing.EARLY;
-      nonSpeculativeIndex = -1;
-    } else {
-      // This is the unique ON_TIME firing for the window.
-      timing = Timing.ON_TIME;
-    }
-
-    WindowTracing.debug(
-        "describePane: {} pane (prev was {}) for key:{}; windowMaxTimestamp:{}; "
-        + "inputWatermark:{}; outputWatermark:{}; isLateForOutput:{}",
-        timing, previousTiming, key, windowMaxTimestamp, inputWM, outputWM, isLateForOutput);
-
-    if (previousPane != null) {
-      // Timing transitions should follow EARLY* ON_TIME? LATE*
-      switch (previousTiming) {
-        case EARLY:
-          Preconditions.checkState(
-              timing == Timing.EARLY || timing == Timing.ON_TIME || timing == Timing.LATE,
-              "EARLY cannot transition to %s", timing);
-          break;
-        case ON_TIME:
-          Preconditions.checkState(
-              timing == Timing.LATE, "ON_TIME cannot transition to %s", timing);
-          break;
-        case LATE:
-          Preconditions.checkState(timing == Timing.LATE, "LATE cannot transtion to %s", timing);
-          break;
-        case UNKNOWN:
-          break;
-      }
-      Preconditions.checkState(!previousPane.isLast(), "Last pane was not last after all.");
-    }
-
-    return PaneInfo.createPane(isFirst, isFinal, timing, index, nonSpeculativeIndex);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PathValidator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PathValidator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PathValidator.java
deleted file mode 100644
index 658de2a..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PathValidator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-/**
- * Interface for controlling validation of paths.
- */
-public interface PathValidator {
-  /**
-   * Validate that a file pattern is conforming.
-   *
-   * @param filepattern The file pattern to verify.
-   * @return The post-validation filepattern.
-   */
-  public String validateInputFilePatternSupported(String filepattern);
-
-  /**
-   * Validate that an output file prefix is conforming.
-   *
-   * @param filePrefix the file prefix to verify.
-   * @return The post-validation filePrefix.
-   */
-  public String validateOutputFilePrefixSupported(String filePrefix);
-
-  /**
-   * Validate that a path is a valid path and that the path
-   * is accessible.
-   *
-   * @param path The path to verify.
-   * @return The post-validation path.
-   */
-  public String verifyPath(String path);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PerKeyCombineFnRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PerKeyCombineFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PerKeyCombineFnRunner.java
deleted file mode 100644
index b5f328f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PerKeyCombineFnRunner.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-/**
- * An interface that runs a {@link PerKeyCombineFn} with unified APIs.
- *
- * <p>Different keyed combine functions have their own implementations.
- * For example, the implementation can skip allocating {@code Combine.Context},
- * if the keyed combine function doesn't use it.
- */
-public interface PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> extends Serializable {
-  /**
-   * Returns the {@link PerKeyCombineFn} it holds.
-   *
-   * <p>It can be a {@code KeyedCombineFn} or a {@code KeyedCombineFnWithContext}.
-   */
-  public PerKeyCombineFn<K, InputT, AccumT, OutputT> fn();
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to create the accumulator in a {@link DoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext}
-   * if it is required.
-   */
-  public AccumT createAccumulator(K key, DoFn<?, ?>.ProcessContext c);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to add the input in a {@link DoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext}
-   * if it is required.
-   */
-  public AccumT addInput(K key, AccumT accumulator, InputT input, DoFn<?, ?>.ProcessContext c);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to merge accumulators in a {@link DoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext}
-   * if it is required.
-   */
-  public AccumT mergeAccumulators(
-      K key, Iterable<AccumT> accumulators, DoFn<?, ?>.ProcessContext c);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to extract the output in a {@link DoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext}
-   * if it is required.
-   */
-  public OutputT extractOutput(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to compact the accumulator in a {@link DoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext}
-   * if it is required.
-   */
-  public AccumT compact(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to combine the inputs and extract output
-   * in a {@link DoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext}
-   * if it is required.
-   */
-  public OutputT apply(K key, Iterable<? extends InputT> inputs, DoFn<?, ?>.ProcessContext c);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to add all inputs in a {@link DoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext}
-   * if it is required.
-   */
-  public AccumT addInputs(K key, Iterable<InputT> inputs, DoFn<?, ?>.ProcessContext c);
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to create the accumulator.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from
-   * {@link PipelineOptions} and {@link SideInputReader} if it is required.
-   */
-  public AccumT createAccumulator(K key, PipelineOptions options,
-      SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to add the input.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from
-   * {@link PipelineOptions} and {@link SideInputReader} if it is required.
-   */
-  public AccumT addInput(K key, AccumT accumulator, InputT value, PipelineOptions options,
-      SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to merge accumulators.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from
-   * {@link PipelineOptions} and {@link SideInputReader} if it is required.
-   */
-  public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
-      SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to extract the output.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from
-   * {@link PipelineOptions} and {@link SideInputReader} if it is required.
-   */
-  public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
-      SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to compact the accumulator.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from
-   * {@link PipelineOptions} and {@link SideInputReader} if it is required.
-   */
-  public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
-      SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PerKeyCombineFnRunners.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PerKeyCombineFnRunners.java
deleted file mode 100644
index 6606c54..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PerKeyCombineFnRunners.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.RequiresContextInternal;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.common.collect.Iterables;
-
-import java.util.Collection;
-
-/**
- * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations
- * for different keyed combine functions.
- */
-public class PerKeyCombineFnRunners {
-  /**
-   * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}.
-   */
-  public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, AccumT, OutputT>
-  create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
-    if (perKeyCombineFn instanceof RequiresContextInternal) {
-      return new KeyedCombineFnWithContextRunner<>(
-          (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn);
-    } else {
-      return new KeyedCombineFnRunner<>(
-          (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn);
-    }
-  }
-
-  /**
-   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}.
-   *
-   * It forwards functions calls to the {@link KeyedCombineFn}.
-   */
-  private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT>
-      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
-    private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
-
-    private KeyedCombineFnRunner(
-        KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
-      this.keyedCombineFn = keyedCombineFn;
-    }
-
-    @Override
-    public KeyedCombineFn<K, InputT, AccumT, OutputT> fn() {
-      return keyedCombineFn;
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, DoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.createAccumulator(key);
-    }
-
-    @Override
-    public AccumT addInput(
-        K key, AccumT accumulator, InputT input, DoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.addInput(key, accumulator, input);
-    }
-
-    @Override
-    public AccumT mergeAccumulators(
-        K key, Iterable<AccumT> accumulators, DoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.mergeAccumulators(key, accumulators);
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.extractOutput(key, accumulator);
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.compact(key, accumulator);
-    }
-
-    @Override
-    public OutputT apply(K key, Iterable<? extends InputT> inputs, DoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.apply(key, inputs);
-    }
-
-    @Override
-    public AccumT addInputs(K key, Iterable<InputT> inputs, DoFn<?, ?>.ProcessContext c) {
-      AccumT accum = keyedCombineFn.createAccumulator(key);
-      for (InputT input : inputs) {
-        accum = keyedCombineFn.addInput(key, accum, input);
-      }
-      return accum;
-    }
-
-    @Override
-    public String toString() {
-      return keyedCombineFn.toString();
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.createAccumulator(key);
-    }
-
-    @Override
-    public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.addInput(key, accumulator, input);
-    }
-
-    @Override
-    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.mergeAccumulators(key, accumulators);
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.extractOutput(key, accumulator);
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.compact(key, accumulator);
-    }
-  }
-
-  /**
-   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}.
-   *
-   * It forwards functions calls to the {@link KeyedCombineFnWithContext}.
-   */
-  private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT>
-      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
-    private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext;
-
-    private KeyedCombineFnWithContextRunner(
-        KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) {
-      this.keyedCombineFnWithContext = keyedCombineFnWithContext;
-    }
-
-    @Override
-    public KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> fn() {
-      return keyedCombineFnWithContext;
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, DoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.createAccumulator(key,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT addInput(
-        K key, AccumT accumulator, InputT value, DoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.addInput(key, accumulator, value,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT mergeAccumulators(
-        K key, Iterable<AccumT> accumulators, DoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.mergeAccumulators(
-          key, accumulators, CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.extractOutput(key, accumulator,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.compact(key, accumulator,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public OutputT apply(K key, Iterable<? extends InputT> inputs, DoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.apply(key, inputs,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT addInputs(K key, Iterable<InputT> inputs, DoFn<?, ?>.ProcessContext c) {
-      CombineWithContext.Context combineContext = CombineContextFactory.createFromProcessContext(c);
-      AccumT accum = keyedCombineFnWithContext.createAccumulator(key, combineContext);
-      for (InputT input : inputs) {
-        accum = keyedCombineFnWithContext.addInput(key, accum, input, combineContext);
-      }
-      return accum;
-    }
-
-    @Override
-    public String toString() {
-      return keyedCombineFnWithContext.toString();
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, PipelineOptions options, SideInputReader sideInputReader,
-        Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.createAccumulator(key,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.addInput(key, accumulator, input,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.mergeAccumulators(key, accumulators,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.extractOutput(key, accumulator,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.compact(key, accumulator,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
deleted file mode 100644
index 81572ea..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-/**
- * Constant property names used by the SDK in CloudWorkflow specifications.
- */
-public class PropertyNames {
-  public static final String ALLOWED_ENCODINGS = "allowed_encodings";
-  public static final String APPEND_TRAILING_NEWLINES = "append_trailing_newlines";
-  public static final String BIGQUERY_CREATE_DISPOSITION = "create_disposition";
-  public static final String BIGQUERY_DATASET = "dataset";
-  public static final String BIGQUERY_PROJECT = "project";
-  public static final String BIGQUERY_SCHEMA = "schema";
-  public static final String BIGQUERY_TABLE = "table";
-  public static final String BIGQUERY_QUERY = "bigquery_query";
-  public static final String BIGQUERY_FLATTEN_RESULTS = "bigquery_flatten_results";
-  public static final String BIGQUERY_WRITE_DISPOSITION = "write_disposition";
-  public static final String BIGQUERY_EXPORT_FORMAT = "bigquery_export_format";
-  public static final String BIGQUERY_EXPORT_SCHEMA = "bigquery_export_schema";
-  public static final String CO_GBK_RESULT_SCHEMA = "co_gbk_result_schema";
-  public static final String COMBINE_FN = "combine_fn";
-  public static final String COMPONENT_ENCODINGS = "component_encodings";
-  public static final String COMPRESSION_TYPE = "compression_type";
-  public static final String CUSTOM_SOURCE_FORMAT = "custom_source";
-  public static final String CONCAT_SOURCE_SOURCES = "sources";
-  public static final String CONCAT_SOURCE_BASE_SPECS = "base_specs";
-  public static final String SOURCE_STEP_INPUT = "custom_source_step_input";
-  public static final String SOURCE_SPEC = "spec";
-  public static final String SOURCE_METADATA = "metadata";
-  public static final String SOURCE_DOES_NOT_NEED_SPLITTING = "does_not_need_splitting";
-  public static final String SOURCE_PRODUCES_SORTED_KEYS = "produces_sorted_keys";
-  public static final String SOURCE_IS_INFINITE = "is_infinite";
-  public static final String SOURCE_ESTIMATED_SIZE_BYTES = "estimated_size_bytes";
-  public static final String ELEMENT = "element";
-  public static final String ELEMENTS = "elements";
-  public static final String ENCODING = "encoding";
-  public static final String ENCODING_ID = "encoding_id";
-  public static final String END_INDEX = "end_index";
-  public static final String END_OFFSET = "end_offset";
-  public static final String END_SHUFFLE_POSITION = "end_shuffle_position";
-  public static final String ENVIRONMENT_VERSION_JOB_TYPE_KEY = "job_type";
-  public static final String ENVIRONMENT_VERSION_MAJOR_KEY = "major";
-  public static final String FILENAME = "filename";
-  public static final String FILENAME_PREFIX = "filename_prefix";
-  public static final String FILENAME_SUFFIX = "filename_suffix";
-  public static final String FILEPATTERN = "filepattern";
-  public static final String FOOTER = "footer";
-  public static final String FORMAT = "format";
-  public static final String HEADER = "header";
-  public static final String INPUTS = "inputs";
-  public static final String INPUT_CODER = "input_coder";
-  public static final String IS_GENERATED = "is_generated";
-  public static final String IS_MERGING_WINDOW_FN = "is_merging_window_fn";
-  public static final String IS_PAIR_LIKE = "is_pair_like";
-  public static final String IS_STREAM_LIKE = "is_stream_like";
-  public static final String IS_WRAPPER = "is_wrapper";
-  public static final String DISALLOW_COMBINER_LIFTING = "disallow_combiner_lifting";
-  public static final String NON_PARALLEL_INPUTS = "non_parallel_inputs";
-  public static final String NUM_SHARD_CODERS = "num_shard_coders";
-  public static final String NUM_METADATA_SHARD_CODERS = "num_metadata_shard_coders";
-  public static final String NUM_SHARDS = "num_shards";
-  public static final String OBJECT_TYPE_NAME = "@type";
-  public static final String OUTPUT = "output";
-  public static final String OUTPUT_INFO = "output_info";
-  public static final String OUTPUT_NAME = "output_name";
-  public static final String PARALLEL_INPUT = "parallel_input";
-  public static final String PHASE = "phase";
-  public static final String PUBSUB_ID_LABEL = "pubsub_id_label";
-  public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription";
-  public static final String PUBSUB_TIMESTAMP_LABEL = "pubsub_timestamp_label";
-  public static final String PUBSUB_TOPIC = "pubsub_topic";
-  public static final String SCALAR_FIELD_NAME = "value";
-  public static final String SERIALIZED_FN = "serialized_fn";
-  public static final String SHARD_NAME_TEMPLATE = "shard_template";
-  public static final String SHUFFLE_KIND = "shuffle_kind";
-  public static final String SHUFFLE_READER_CONFIG = "shuffle_reader_config";
-  public static final String SHUFFLE_WRITER_CONFIG = "shuffle_writer_config";
-  public static final String SORT_VALUES = "sort_values";
-  public static final String START_INDEX = "start_index";
-  public static final String START_OFFSET = "start_offset";
-  public static final String START_SHUFFLE_POSITION = "start_shuffle_position";
-  public static final String STRIP_TRAILING_NEWLINES = "strip_trailing_newlines";
-  public static final String TUPLE_TAGS = "tuple_tags";
-  public static final String USE_INDEXED_FORMAT = "use_indexed_format";
-  public static final String USER_FN = "user_fn";
-  public static final String USER_NAME = "user_name";
-  public static final String USES_KEYED_STATE = "uses_keyed_state";
-  public static final String VALIDATE_SINK = "validate_sink";
-  public static final String VALIDATE_SOURCE = "validate_source";
-  public static final String VALUE = "value";
-  public static final String DISPLAY_DATA = "display_data";
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/RandomAccessData.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/RandomAccessData.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/RandomAccessData.java
deleted file mode 100644
index 6c96c8e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/RandomAccessData.java
+++ /dev/null
@@ -1,352 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
-import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.common.base.MoreObjects;
-import com.google.common.io.ByteStreams;
-import com.google.common.primitives.UnsignedBytes;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Comparator;
-
-import javax.annotation.concurrent.NotThreadSafe;
-
-/**
- * An elastic-sized byte array which allows you to manipulate it as a stream, or access
- * it directly. This allows for a quick succession of moving bytes from an {@link InputStream}
- * to this wrapper to be used as an {@link OutputStream} and vice versa. This wrapper
- * also provides random access to bytes stored within. This wrapper allows users to finely
- * control the number of byte copies that occur.
- *
- * Anything stored within the in-memory buffer from offset {@link #size()} is considered temporary
- * unused storage.
- */
-@NotThreadSafe
-public class RandomAccessData {
-  /**
-   * A {@link Coder} which encodes the valid parts of this stream.
-   * This follows the same encoding scheme as {@link ByteArrayCoder}.
-   * This coder is deterministic and consistent with equals.
-   *
-   * This coder does not support encoding positive infinity.
-   */
-  public static class RandomAccessDataCoder extends AtomicCoder<RandomAccessData> {
-    private static final RandomAccessDataCoder INSTANCE = new RandomAccessDataCoder();
-
-    @JsonCreator
-    public static RandomAccessDataCoder of() {
-      return INSTANCE;
-    }
-
-    @Override
-    public void encode(RandomAccessData value, OutputStream outStream, Coder.Context context)
-        throws CoderException, IOException {
-      if (value == POSITIVE_INFINITY) {
-        throw new CoderException("Positive infinity can not be encoded.");
-      }
-      if (!context.isWholeStream) {
-        VarInt.encode(value.size, outStream);
-      }
-      value.writeTo(outStream, 0, value.size);
-    }
-
-    @Override
-    public RandomAccessData decode(InputStream inStream, Coder.Context context)
-        throws CoderException, IOException {
-      RandomAccessData rval = new RandomAccessData();
-      if (!context.isWholeStream) {
-        int length = VarInt.decodeInt(inStream);
-        rval.readFrom(inStream, 0, length);
-      } else {
-        ByteStreams.copy(inStream, rval.asOutputStream());
-      }
-      return rval;
-    }
-
-    @Override
-    public boolean consistentWithEquals() {
-      return true;
-    }
-
-    @Override
-    public boolean isRegisterByteSizeObserverCheap(
-        RandomAccessData value, Coder.Context context) {
-      return true;
-    }
-
-    @Override
-    protected long getEncodedElementByteSize(RandomAccessData value, Coder.Context context)
-        throws Exception {
-      if (value == null) {
-        throw new CoderException("cannot encode a null in memory stream");
-      }
-      long size = 0;
-      if (!context.isWholeStream) {
-        size += VarInt.getLength(value.size);
-      }
-      return size + value.size;
-    }
-  }
-
-  public static final UnsignedLexicographicalComparator UNSIGNED_LEXICOGRAPHICAL_COMPARATOR =
-      new UnsignedLexicographicalComparator();
-
-  /**
-   * A {@link Comparator} that compares two byte arrays lexicographically. It compares
-   * values as a list of unsigned bytes. The first pair of values that follow any common prefix,
-   * or when one array is a prefix of the other, treats the shorter array as the lesser.
-   * For example, [] < [0x01] < [0x01, 0x7F] < [0x01, 0x80] < [0x02] < POSITIVE INFINITY.
-   *
-   * <p>Note that a token type of positive infinity is supported and is greater than
-   * all other {@link RandomAccessData}.
-   */
-  public static final class UnsignedLexicographicalComparator
-      implements Comparator<RandomAccessData> {
-    // Do not instantiate
-    private UnsignedLexicographicalComparator() {
-    }
-
-    @Override
-    public int compare(RandomAccessData o1, RandomAccessData o2) {
-      return compare(o1, o2, 0 /* start from the beginning */);
-    }
-
-    /**
-     * Compare the two sets of bytes starting at the given offset.
-     */
-    public int compare(RandomAccessData o1, RandomAccessData o2, int startOffset) {
-      if (o1 == o2) {
-        return 0;
-      }
-      if (o1 == POSITIVE_INFINITY) {
-        return 1;
-      }
-      if (o2 == POSITIVE_INFINITY) {
-        return -1;
-      }
-
-      int minBytesLen = Math.min(o1.size, o2.size);
-      for (int i = startOffset; i < minBytesLen; i++) {
-        // unsigned comparison
-        int b1 = o1.buffer[i] & 0xFF;
-        int b2 = o2.buffer[i] & 0xFF;
-        if (b1 == b2) {
-          continue;
-        }
-        // Return the stream with the smaller byte as the smaller value.
-        return b1 - b2;
-      }
-      // If one is a prefix of the other, return the shorter one as the smaller one.
-      // If both lengths are equal, then both streams are equal.
-      return o1.size - o2.size;
-    }
-
-    /**
-     * Compute the length of the common prefix of the two provided sets of bytes.
-     */
-    public int commonPrefixLength(RandomAccessData o1, RandomAccessData o2) {
-      int minBytesLen = Math.min(o1.size, o2.size);
-      for (int i = 0; i < minBytesLen; i++) {
-        // unsigned comparison
-        int b1 = o1.buffer[i] & 0xFF;
-        int b2 = o2.buffer[i] & 0xFF;
-        if (b1 != b2) {
-          return i;
-        }
-      }
-      return minBytesLen;
-    }
-  }
-
-  /** A token type representing positive infinity. */
-  static final RandomAccessData POSITIVE_INFINITY = new RandomAccessData(0);
-
-  /**
-   * Returns a RandomAccessData that is the smallest value of same length which
-   * is strictly greater than this. Note that if this is empty or is all 0xFF then
-   * a token value of positive infinity is returned.
-   *
-   * The {@link UnsignedLexicographicalComparator} supports comparing {@link RandomAccessData}
-   * with support for positive infinitiy.
-   */
-  public RandomAccessData increment() throws IOException {
-    RandomAccessData copy = copy();
-    for (int i = copy.size - 1; i >= 0; --i) {
-      if (copy.buffer[i] != UnsignedBytes.MAX_VALUE) {
-        copy.buffer[i] = UnsignedBytes.checkedCast(UnsignedBytes.toInt(copy.buffer[i]) + 1);
-        return copy;
-      }
-    }
-    return POSITIVE_INFINITY;
-  }
-
-  private static final int DEFAULT_INITIAL_BUFFER_SIZE = 128;
-
-  /** Constructs a RandomAccessData with a default buffer size. */
-  public RandomAccessData() {
-    this(DEFAULT_INITIAL_BUFFER_SIZE);
-  }
-
-  /** Constructs a RandomAccessData with the initial buffer. */
-  public RandomAccessData(byte[] initialBuffer) {
-    checkNotNull(initialBuffer);
-    this.buffer = initialBuffer;
-    this.size = initialBuffer.length;
-  }
-
-  /** Constructs a RandomAccessData with the given buffer size. */
-  public RandomAccessData(int initialBufferSize) {
-    checkArgument(initialBufferSize >= 0, "Expected initial buffer size to be greater than zero.");
-    this.buffer = new byte[initialBufferSize];
-  }
-
-  private byte[] buffer;
-  private int size;
-
-  /** Returns the backing array. */
-  public byte[] array() {
-    return buffer;
-  }
-
-  /** Returns the number of bytes in the backing array that are valid. */
-  public int size() {
-    return size;
-  }
-
-  /** Resets the end of the stream to the specified position. */
-  public void resetTo(int position) {
-    ensureCapacity(position);
-    size = position;
-  }
-
-  private final OutputStream outputStream = new OutputStream() {
-    @Override
-    public void write(int b) throws IOException {
-      ensureCapacity(size + 1);
-      buffer[size] = (byte) b;
-      size += 1;
-    }
-
-    @Override
-    public void write(byte[] b, int offset, int length) throws IOException {
-      ensureCapacity(size + length);
-      System.arraycopy(b, offset, buffer, size, length);
-      size += length;
-    }
-  };
-
-  /**
-   * Returns an output stream which writes to the backing buffer from the current position.
-   * Note that the internal buffer will grow as required to accomodate all data written.
-   */
-  public OutputStream asOutputStream() {
-    return outputStream;
-  }
-
-  /**
-   * Returns an {@link InputStream} wrapper which supplies the portion of this backing byte buffer
-   * starting at {@code offset} and up to {@code length} bytes. Note that the returned
-   * {@link InputStream} is only a wrapper and any modifications to the underlying
-   * {@link RandomAccessData} will be visible by the {@link InputStream}.
-   */
-  public InputStream asInputStream(final int offset, final int length) {
-    return new ByteArrayInputStream(buffer, offset, length);
-  }
-
-  /**
-   * Writes {@code length} bytes starting at {@code offset} from the backing data store to the
-   * specified output stream.
-   */
-  public void writeTo(OutputStream out, int offset, int length) throws IOException {
-    out.write(buffer, offset, length);
-  }
-
-  /**
-   * Reads {@code length} bytes from the specified input stream writing them into the backing
-   * data store starting at {@code offset}.
-   *
-   * <p>Note that the in memory stream will be grown to ensure there is enough capacity.
-   */
-  public void readFrom(InputStream inStream, int offset, int length) throws IOException {
-    ensureCapacity(offset + length);
-    ByteStreams.readFully(inStream, buffer, offset, length);
-    size = offset + length;
-  }
-
-  /** Returns a copy of this RandomAccessData. */
-  public RandomAccessData copy() throws IOException {
-    RandomAccessData copy = new RandomAccessData(size);
-    writeTo(copy.asOutputStream(), 0, size);
-    return copy;
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other == this) {
-      return true;
-    }
-    if (!(other instanceof RandomAccessData)) {
-      return false;
-    }
-    return UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(this, (RandomAccessData) other) == 0;
-  }
-
-  @Override
-  public int hashCode() {
-    int result = 1;
-    for (int i = 0; i < size; ++i) {
-        result = 31 * result + buffer[i];
-    }
-
-    return result;
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(this)
-        .add("buffer", Arrays.copyOf(buffer, size))
-        .add("size", size)
-        .toString();
-  }
-
-  private void ensureCapacity(int minCapacity) {
-    // If we have enough space, don't grow the buffer.
-    if (minCapacity <= buffer.length) {
-        return;
-    }
-
-    // Try to double the size of the buffer, if thats not enough, just use the new capacity.
-    // Note that we use Math.min(long, long) to not cause overflow on the multiplication.
-    int newCapacity = (int) Math.min(Integer.MAX_VALUE, buffer.length * 2L);
-    if (newCapacity < minCapacity) {
-        newCapacity = minCapacity;
-    }
-    buffer = Arrays.copyOf(buffer, newCapacity);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFn.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFn.java
deleted file mode 100644
index c5ef2ea..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFn.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.util.state.MergingStateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.ReadableState;
-import com.google.cloud.dataflow.sdk.util.state.StateAccessor;
-
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-
-/**
- * Specification for processing to happen after elements have been grouped by key.
- *
- * @param <K> The type of key being processed.
- * @param <InputT> The type of input values associated with the key.
- * @param <OutputT> The output type that will be produced for each key.
- * @param <W> The type of windows this operates on.
- */
-public abstract class ReduceFn<K, InputT, OutputT, W extends BoundedWindow>
-    implements Serializable {
-
-  /** Information accessible to all the processing methods in this {@code ReduceFn}. */
-  public abstract class Context {
-    /** Return the key that is being processed. */
-    public abstract K key();
-
-    /** The window that is being processed. */
-    public abstract W window();
-
-    /** Access the current {@link WindowingStrategy}. */
-    public abstract WindowingStrategy<?, W> windowingStrategy();
-
-    /** Return the interface for accessing state. */
-    public abstract StateAccessor<K> state();
-
-    /** Return the interface for accessing timers. */
-    public abstract Timers timers();
-  }
-
-  /** Information accessible within {@link #processValue}. */
-  public abstract class ProcessValueContext extends Context {
-    /** Return the actual value being processed. */
-    public abstract InputT value();
-
-    /** Return the timestamp associated with the value. */
-    public abstract Instant timestamp();
-  }
-
-  /** Information accessible within {@link #onMerge}. */
-  public abstract class OnMergeContext extends Context {
-    /** Return the interface for accessing state. */
-    @Override
-    public abstract MergingStateAccessor<K, W> state();
-  }
-
-  /** Information accessible within {@link #onTrigger}. */
-  public abstract class OnTriggerContext extends Context {
-    /** Returns the {@link PaneInfo} for the trigger firing being processed. */
-    public abstract PaneInfo paneInfo();
-
-    /** Output the given value in the current window. */
-    public abstract void output(OutputT value);
-  }
-
-  //////////////////////////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Called for each value of type {@code InputT} associated with the current key.
-   */
-  public abstract void processValue(ProcessValueContext c) throws Exception;
-
-  /**
-   * Called when windows are merged.
-   */
-  public abstract void onMerge(OnMergeContext context) throws Exception;
-
-  /**
-   * Called when triggers fire.
-   *
-   * <p>Implementations of {@link ReduceFn} should call {@link OnTriggerContext#output} to emit
-   * any results that should be included in the pane produced by this trigger firing.
-   */
-  public abstract void onTrigger(OnTriggerContext context) throws Exception;
-
-  /**
-   * Called before {@link #onMerge} is invoked to provide an opportunity to prefetch any needed
-   * state.
-   *
-   * @param c Context to use prefetch from.
-   */
-  public void prefetchOnMerge(MergingStateAccessor<K, W> c) throws Exception {}
-
-  /**
-   * Called before {@link #onTrigger} is invoked to provide an opportunity to prefetch any needed
-   * state.
-   *
-   * @param context Context to use prefetch from.
-   */
-  public void prefetchOnTrigger(StateAccessor<K> context) {}
-
-  /**
-   * Called to clear any persisted state that the {@link ReduceFn} may be holding. This will be
-   * called when the windowing is closing and will receive no future interactions.
-   */
-  public abstract void clearState(Context context) throws Exception;
-
-  /**
-   * Returns true if the there is no buffered state.
-   */
-  public abstract ReadableState<Boolean> isEmpty(StateAccessor<K> context);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java
deleted file mode 100644
index bdbaf10..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java
+++ /dev/null
@@ -1,495 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
-import com.google.cloud.dataflow.sdk.util.state.MergingStateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.ReadableState;
-import com.google.cloud.dataflow.sdk.util.state.State;
-import com.google.cloud.dataflow.sdk.util.state.StateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.StateContext;
-import com.google.cloud.dataflow.sdk.util.state.StateContexts;
-import com.google.cloud.dataflow.sdk.util.state.StateInternals;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespace;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespaces.WindowNamespace;
-import com.google.cloud.dataflow.sdk.util.state.StateTag;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.Instant;
-
-import java.util.Collection;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * Factory for creating instances of the various {@link ReduceFn} contexts.
- */
-class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
-  public interface OnTriggerCallbacks<OutputT> {
-    void output(OutputT toOutput);
-  }
-
-  private final K key;
-  private final ReduceFn<K, InputT, OutputT, W> reduceFn;
-  private final WindowingStrategy<?, W> windowingStrategy;
-  private final StateInternals<K> stateInternals;
-  private final ActiveWindowSet<W> activeWindows;
-  private final TimerInternals timerInternals;
-  private final WindowingInternals<?, ?> windowingInternals;
-  private final PipelineOptions options;
-
-  ReduceFnContextFactory(K key, ReduceFn<K, InputT, OutputT, W> reduceFn,
-      WindowingStrategy<?, W> windowingStrategy, StateInternals<K> stateInternals,
-      ActiveWindowSet<W> activeWindows, TimerInternals timerInternals,
-      WindowingInternals<?, ?> windowingInternals, PipelineOptions options) {
-    this.key = key;
-    this.reduceFn = reduceFn;
-    this.windowingStrategy = windowingStrategy;
-    this.stateInternals = stateInternals;
-    this.activeWindows = activeWindows;
-    this.timerInternals = timerInternals;
-    this.windowingInternals = windowingInternals;
-    this.options = options;
-  }
-
-  /** Where should we look for state associated with a given window? */
-  public static enum StateStyle {
-    /** All state is associated with the window itself. */
-    DIRECT,
-    /** State is associated with the 'state address' windows tracked by the active window set. */
-    RENAMED
-  }
-
-  private StateAccessorImpl<K, W> stateAccessor(W window, StateStyle style) {
-    return new StateAccessorImpl<K, W>(
-        activeWindows, windowingStrategy.getWindowFn().windowCoder(),
-        stateInternals, StateContexts.createFromComponents(options, windowingInternals, window),
-        style);
-  }
-
-  public ReduceFn<K, InputT, OutputT, W>.Context base(W window, StateStyle style) {
-    return new ContextImpl(stateAccessor(window, style));
-  }
-
-  public ReduceFn<K, InputT, OutputT, W>.ProcessValueContext forValue(
-      W window, InputT value, Instant timestamp, StateStyle style) {
-    return new ProcessValueContextImpl(stateAccessor(window, style), value, timestamp);
-  }
-
-  public ReduceFn<K, InputT, OutputT, W>.OnTriggerContext forTrigger(W window,
-      ReadableState<PaneInfo> pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
-    return new OnTriggerContextImpl(stateAccessor(window, style), pane, callbacks);
-  }
-
-  public ReduceFn<K, InputT, OutputT, W>.OnMergeContext forMerge(
-      Collection<W> activeToBeMerged, W mergeResult, StateStyle style) {
-    return new OnMergeContextImpl(
-        new MergingStateAccessorImpl<K, W>(activeWindows,
-            windowingStrategy.getWindowFn().windowCoder(),
-            stateInternals, style, activeToBeMerged, mergeResult));
-  }
-
-  public ReduceFn<K, InputT, OutputT, W>.OnMergeContext forPremerge(W window) {
-    return new OnPremergeContextImpl(new PremergingStateAccessorImpl<K, W>(
-        activeWindows, windowingStrategy.getWindowFn().windowCoder(), stateInternals, window));
-  }
-
-  private class TimersImpl implements Timers {
-    private final StateNamespace namespace;
-
-    public TimersImpl(StateNamespace namespace) {
-      Preconditions.checkArgument(namespace instanceof WindowNamespace);
-      this.namespace = namespace;
-    }
-
-    @Override
-    public void setTimer(Instant timestamp, TimeDomain timeDomain) {
-      timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain));
-    }
-
-    @Override
-    public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
-      timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain));
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      return timerInternals.currentProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentSynchronizedProcessingTime() {
-      return timerInternals.currentSynchronizedProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentEventTime() {
-      return timerInternals.currentInputWatermarkTime();
-    }
-  }
-
-  // ======================================================================
-  // StateAccessors
-  // ======================================================================
-  static class StateAccessorImpl<K, W extends BoundedWindow> implements StateAccessor<K> {
-
-
-    protected final ActiveWindowSet<W> activeWindows;
-    protected final StateContext<W> context;
-    protected final StateNamespace windowNamespace;
-    protected final Coder<W> windowCoder;
-    protected final StateInternals<K> stateInternals;
-    protected final StateStyle style;
-
-    public StateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
-        StateInternals<K> stateInternals, StateContext<W> context, StateStyle style) {
-
-      this.activeWindows = activeWindows;
-      this.windowCoder = windowCoder;
-      this.stateInternals = stateInternals;
-      this.context = checkNotNull(context);
-      this.windowNamespace = namespaceFor(context.window());
-      this.style = style;
-    }
-
-    protected StateNamespace namespaceFor(W window) {
-      return StateNamespaces.window(windowCoder, window);
-    }
-
-    protected StateNamespace windowNamespace() {
-      return windowNamespace;
-    }
-
-    W window() {
-      return context.window();
-    }
-
-    StateNamespace namespace() {
-      return windowNamespace();
-    }
-
-    @Override
-    public <StateT extends State> StateT access(StateTag<? super K, StateT> address) {
-      switch (style) {
-        case DIRECT:
-          return stateInternals.state(windowNamespace(), address, context);
-        case RENAMED:
-          return stateInternals.state(
-              namespaceFor(activeWindows.writeStateAddress(context.window())), address, context);
-      }
-      throw new RuntimeException(); // cases are exhaustive.
-    }
-  }
-
-  static class MergingStateAccessorImpl<K, W extends BoundedWindow>
-      extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> {
-    private final Collection<W> activeToBeMerged;
-
-    public MergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
-        StateInternals<K> stateInternals, StateStyle style, Collection<W> activeToBeMerged,
-        W mergeResult) {
-      super(activeWindows, windowCoder, stateInternals,
-          StateContexts.windowOnly(mergeResult), style);
-      this.activeToBeMerged = activeToBeMerged;
-    }
-
-    @Override
-    public <StateT extends State> StateT access(StateTag<? super K, StateT> address) {
-      switch (style) {
-        case DIRECT:
-          return stateInternals.state(windowNamespace(), address, context);
-        case RENAMED:
-          return stateInternals.state(
-              namespaceFor(activeWindows.mergedWriteStateAddress(
-                  activeToBeMerged, context.window())),
-              address,
-              context);
-      }
-      throw new RuntimeException(); // cases are exhaustive.
-    }
-
-    @Override
-    public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
-        StateTag<? super K, StateT> address) {
-      ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
-      for (W mergingWindow : activeToBeMerged) {
-        StateNamespace namespace = null;
-        switch (style) {
-          case DIRECT:
-            namespace = namespaceFor(mergingWindow);
-            break;
-          case RENAMED:
-            namespace = namespaceFor(activeWindows.writeStateAddress(mergingWindow));
-            break;
-        }
-        Preconditions.checkNotNull(namespace); // cases are exhaustive.
-        builder.put(mergingWindow, stateInternals.state(namespace, address, context));
-      }
-      return builder.build();
-    }
-  }
-
-  static class PremergingStateAccessorImpl<K, W extends BoundedWindow>
-      extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> {
-    public PremergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
-        StateInternals<K> stateInternals, W window) {
-      super(activeWindows, windowCoder, stateInternals,
-          StateContexts.windowOnly(window), StateStyle.RENAMED);
-    }
-
-    Collection<W> mergingWindows() {
-      return activeWindows.readStateAddresses(context.window());
-    }
-
-    @Override
-    public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
-        StateTag<? super K, StateT> address) {
-      ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
-      for (W stateAddressWindow : activeWindows.readStateAddresses(context.window())) {
-        StateT stateForWindow =
-            stateInternals.state(namespaceFor(stateAddressWindow), address, context);
-        builder.put(stateAddressWindow, stateForWindow);
-      }
-      return builder.build();
-    }
-  }
-
-  // ======================================================================
-  // Contexts
-  // ======================================================================
-
-  private class ContextImpl extends ReduceFn<K, InputT, OutputT, W>.Context {
-    private final StateAccessorImpl<K, W> state;
-    private final TimersImpl timers;
-
-    private ContextImpl(StateAccessorImpl<K, W> state) {
-      reduceFn.super();
-      this.state = state;
-      this.timers = new TimersImpl(state.namespace());
-    }
-
-    @Override
-    public K key() {
-      return key;
-    }
-
-    @Override
-    public W window() {
-      return state.window();
-    }
-
-    @Override
-    public WindowingStrategy<?, W> windowingStrategy() {
-      return windowingStrategy;
-    }
-
-    @Override
-    public StateAccessor<K> state() {
-      return state;
-    }
-
-    @Override
-    public Timers timers() {
-      return timers;
-    }
-  }
-
-  private class ProcessValueContextImpl
-      extends ReduceFn<K, InputT, OutputT, W>.ProcessValueContext {
-    private final InputT value;
-    private final Instant timestamp;
-    private final StateAccessorImpl<K, W> state;
-    private final TimersImpl timers;
-
-    private ProcessValueContextImpl(StateAccessorImpl<K, W> state,
-        InputT value, Instant timestamp) {
-      reduceFn.super();
-      this.state = state;
-      this.value = value;
-      this.timestamp = timestamp;
-      this.timers = new TimersImpl(state.namespace());
-    }
-
-    @Override
-    public K key() {
-      return key;
-    }
-
-    @Override
-    public W window() {
-      return state.window();
-    }
-
-    @Override
-    public WindowingStrategy<?, W> windowingStrategy() {
-      return windowingStrategy;
-    }
-
-    @Override
-    public StateAccessor<K> state() {
-      return state;
-    }
-
-    @Override
-    public InputT value() {
-      return value;
-    }
-
-    @Override
-    public Instant timestamp() {
-      return timestamp;
-    }
-
-    @Override
-    public Timers timers() {
-      return timers;
-    }
-  }
-
-  private class OnTriggerContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnTriggerContext {
-    private final StateAccessorImpl<K, W> state;
-    private final ReadableState<PaneInfo> pane;
-    private final OnTriggerCallbacks<OutputT> callbacks;
-    private final TimersImpl timers;
-
-    private OnTriggerContextImpl(StateAccessorImpl<K, W> state, ReadableState<PaneInfo> pane,
-        OnTriggerCallbacks<OutputT> callbacks) {
-      reduceFn.super();
-      this.state = state;
-      this.pane = pane;
-      this.callbacks = callbacks;
-      this.timers = new TimersImpl(state.namespace());
-    }
-
-    @Override
-    public K key() {
-      return key;
-    }
-
-    @Override
-    public W window() {
-      return state.window();
-    }
-
-    @Override
-    public WindowingStrategy<?, W> windowingStrategy() {
-      return windowingStrategy;
-    }
-
-    @Override
-    public StateAccessor<K> state() {
-      return state;
-    }
-
-    @Override
-    public PaneInfo paneInfo() {
-      return pane.read();
-    }
-
-    @Override
-    public void output(OutputT value) {
-      callbacks.output(value);
-    }
-
-    @Override
-    public Timers timers() {
-      return timers;
-    }
-  }
-
-  private class OnMergeContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnMergeContext {
-    private final MergingStateAccessorImpl<K, W> state;
-    private final TimersImpl timers;
-
-    private OnMergeContextImpl(MergingStateAccessorImpl<K, W> state) {
-      reduceFn.super();
-      this.state = state;
-      this.timers = new TimersImpl(state.namespace());
-    }
-
-    @Override
-    public K key() {
-      return key;
-    }
-
-    @Override
-    public WindowingStrategy<?, W> windowingStrategy() {
-      return windowingStrategy;
-    }
-
-    @Override
-    public MergingStateAccessor<K, W> state() {
-      return state;
-    }
-
-    @Override
-    public W window() {
-      return state.window();
-    }
-
-    @Override
-    public Timers timers() {
-      return timers;
-    }
-  }
-
-  private class OnPremergeContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnMergeContext {
-    private final PremergingStateAccessorImpl<K, W> state;
-    private final TimersImpl timers;
-
-    private OnPremergeContextImpl(PremergingStateAccessorImpl<K, W> state) {
-      reduceFn.super();
-      this.state = state;
-      this.timers = new TimersImpl(state.namespace());
-    }
-
-    @Override
-    public K key() {
-      return key;
-    }
-
-    @Override
-    public WindowingStrategy<?, W> windowingStrategy() {
-      return windowingStrategy;
-    }
-
-    @Override
-    public MergingStateAccessor<K, W> state() {
-      return state;
-    }
-
-    @Override
-    public W window() {
-      return state.window();
-    }
-
-    @Override
-    public Timers timers() {
-      return timers;
-    }
-  }
-}