You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:37 UTC

[13/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CounterAggregator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CounterAggregator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CounterAggregator.java
deleted file mode 100644
index 824825f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CounterAggregator.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.Max;
-import com.google.cloud.dataflow.sdk.transforms.Min;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.util.common.Counter;
-import com.google.cloud.dataflow.sdk.util.common.CounterProvider;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet;
-
-/**
- * An implementation of the {@code Aggregator} interface that uses a
- * {@link Counter} as the underlying representation. Supports {@link CombineFn}s
- * from the {@link Sum}, {@link Min} and {@link Max} classes.
- *
- * @param <InputT> the type of input values
- * @param <AccumT> the type of accumulator values
- * @param <OutputT> the type of output value
- */
-public class CounterAggregator<InputT, AccumT, OutputT> implements Aggregator<InputT, OutputT> {
-
-  private final Counter<InputT> counter;
-  private final CombineFn<InputT, AccumT, OutputT> combiner;
-
-  /**
-   * Constructs a new aggregator with the given name and aggregation logic
-   * specified in the CombineFn argument. The underlying counter is
-   * automatically added into the provided CounterSet.
-   *
-   *  <p>If a counter with the same name already exists, it will be reused, as
-   * long as it has the same type.
-   */
-  public CounterAggregator(String name, CombineFn<? super InputT, AccumT, OutputT> combiner,
-      CounterSet.AddCounterMutator addCounterMutator) {
-    // Safe contravariant cast
-    this(constructCounter(name, combiner), addCounterMutator,
-        (CombineFn<InputT, AccumT, OutputT>) combiner);
-  }
-
-  private CounterAggregator(Counter<InputT> counter,
-      CounterSet.AddCounterMutator addCounterMutator,
-      CombineFn<InputT, AccumT, OutputT> combiner) {
-    try {
-      this.counter = addCounterMutator.addCounter(counter);
-    } catch (IllegalArgumentException ex) {
-      throw new IllegalArgumentException(
-          "aggregator's name collides with an existing aggregator "
-          + "or system-provided counter of an incompatible type");
-    }
-    this.combiner = combiner;
-  }
-
-  private static <T> Counter<T> constructCounter(String name,
-      CombineFn<? super T, ?, ?> combiner) {
-    if (combiner instanceof CounterProvider) {
-      @SuppressWarnings("unchecked")
-      CounterProvider<T> counterProvider = (CounterProvider<T>) combiner;
-      return counterProvider.getCounter(name);
-    } else {
-      throw new IllegalArgumentException("unsupported combiner in Aggregator: "
-        + combiner.getClass().getName());
-    }
-  }
-
-  @Override
-  public void addValue(InputT value) {
-    counter.addValue(value);
-  }
-
-  @Override
-  public String getName() {
-    return counter.getName();
-  }
-
-  @Override
-  public CombineFn<InputT, ?, OutputT> getCombineFn() {
-    return combiner;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CredentialFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CredentialFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CredentialFactory.java
deleted file mode 100644
index 4913a1e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CredentialFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.auth.oauth2.Credential;
-
-import java.io.IOException;
-import java.security.GeneralSecurityException;
-
-/**
- * Construct an oauth credential to be used by the SDK and the SDK workers.
- */
-public interface CredentialFactory {
-  public Credential getCredential() throws IOException, GeneralSecurityException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Credentials.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Credentials.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Credentials.java
deleted file mode 100644
index 671b131..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Credentials.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.extensions.java6.auth.oauth2.AbstractPromptReceiver;
-import com.google.api.client.extensions.java6.auth.oauth2.AuthorizationCodeInstalledApp;
-import com.google.api.client.googleapis.auth.oauth2.GoogleAuthorizationCodeFlow;
-import com.google.api.client.googleapis.auth.oauth2.GoogleClientSecrets;
-import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
-import com.google.api.client.googleapis.auth.oauth2.GoogleOAuthConstants;
-import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
-import com.google.api.client.http.GenericUrl;
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.client.util.store.FileDataStoreFactory;
-import com.google.cloud.dataflow.sdk.options.GcpOptions;
-import com.google.common.base.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.security.GeneralSecurityException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * Provides support for loading credentials.
- */
-public class Credentials {
-
-  private static final Logger LOG = LoggerFactory.getLogger(Credentials.class);
-
-  /**
-   * OAuth 2.0 scopes used by a local worker (not on GCE).
-   * The scope cloud-platform provides access to all Cloud Platform resources.
-   * cloud-platform isn't sufficient yet for talking to datastore so we request
-   * those resources separately.
-   *
-   * <p>Note that trusted scope relationships don't apply to OAuth tokens, so for
-   * services we access directly (GCS) as opposed to through the backend
-   * (BigQuery, GCE), we need to explicitly request that scope.
-   */
-  private static final List<String> SCOPES = Arrays.asList(
-      "https://www.googleapis.com/auth/cloud-platform",
-      "https://www.googleapis.com/auth/devstorage.full_control",
-      "https://www.googleapis.com/auth/userinfo.email",
-      "https://www.googleapis.com/auth/datastore");
-
-  private static class PromptReceiver extends AbstractPromptReceiver {
-    @Override
-    public String getRedirectUri() {
-      return GoogleOAuthConstants.OOB_REDIRECT_URI;
-    }
-  }
-
-  /**
-   * Initializes OAuth2 credentials.
-   *
-   * <p>This can use 3 different mechanisms for obtaining a credential:
-   * <ol>
-   *   <li>
-   *     It can fetch the
-   *     <a href="https://developers.google.com/accounts/docs/application-default-credentials">
-   *     application default credentials</a>.
-   *   </li>
-   *   <li>
-   *     The user can specify a client secrets file and go through the OAuth2
-   *     webflow. The credential will then be cached in the user's home
-   *     directory for reuse. Provide the property "secrets_file" to use this
-   *     mechanism.
-   *   </li>
-   *   <li>
-   *     The user can specify a file containing a service account.
-   *     Provide the properties "service_account_keyfile" and
-   *     "service_account_name" to use this mechanism.
-   *   </li>
-   * </ol>
-   * The default mechanism is to use the
-   * <a href="https://developers.google.com/accounts/docs/application-default-credentials">
-   * application default credentials</a>. The other options can be used by providing the
-   * corresponding properties.
-   */
-  public static Credential getCredential(GcpOptions options)
-      throws IOException, GeneralSecurityException {
-    String keyFile = options.getServiceAccountKeyfile();
-    String accountName = options.getServiceAccountName();
-
-    if (keyFile != null && accountName != null) {
-      try {
-        return getCredentialFromFile(keyFile, accountName, SCOPES);
-      } catch (GeneralSecurityException e) {
-        throw new IOException("Unable to obtain credentials from file", e);
-      }
-    }
-
-    if (options.getSecretsFile() != null) {
-      return getCredentialFromClientSecrets(options, SCOPES);
-    }
-
-    try {
-      return GoogleCredential.getApplicationDefault().createScoped(SCOPES);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to get application default credentials. Please see "
-          + "https://developers.google.com/accounts/docs/application-default-credentials "
-          + "for details on how to specify credentials. This version of the SDK is "
-          + "dependent on the gcloud core component version 2015.02.05 or newer to "
-          + "be able to get credentials from the currently authorized user via gcloud auth.", e);
-    }
-  }
-
-  /**
-   * Loads OAuth2 credential from a local file.
-   */
-  private static Credential getCredentialFromFile(
-      String keyFile, String accountId, Collection<String> scopes)
-      throws IOException, GeneralSecurityException {
-    GoogleCredential credential = new GoogleCredential.Builder()
-        .setTransport(Transport.getTransport())
-        .setJsonFactory(Transport.getJsonFactory())
-        .setServiceAccountId(accountId)
-        .setServiceAccountScopes(scopes)
-        .setServiceAccountPrivateKeyFromP12File(new File(keyFile))
-        .build();
-
-    LOG.info("Created credential from file {}", keyFile);
-    return credential;
-  }
-
-  /**
-   * Loads OAuth2 credential from client secrets, which may require an
-   * interactive authorization prompt.
-   */
-  private static Credential getCredentialFromClientSecrets(
-      GcpOptions options, Collection<String> scopes)
-      throws IOException, GeneralSecurityException {
-    String clientSecretsFile = options.getSecretsFile();
-
-    Preconditions.checkArgument(clientSecretsFile != null);
-    HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport();
-
-    JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
-    GoogleClientSecrets clientSecrets;
-
-    try {
-      clientSecrets = GoogleClientSecrets.load(jsonFactory,
-          new FileReader(clientSecretsFile));
-    } catch (IOException e) {
-      throw new RuntimeException(
-          "Could not read the client secrets from file: " + clientSecretsFile,
-          e);
-    }
-
-    FileDataStoreFactory dataStoreFactory =
-        new FileDataStoreFactory(new java.io.File(options.getCredentialDir()));
-
-    GoogleAuthorizationCodeFlow flow = new GoogleAuthorizationCodeFlow.Builder(
-        httpTransport, jsonFactory, clientSecrets, scopes)
-        .setDataStoreFactory(dataStoreFactory)
-        .setTokenServerUrl(new GenericUrl(options.getTokenServerUrl()))
-        .setAuthorizationServerEncodedUrl(options.getAuthorizationServerEncodedUrl())
-        .build();
-
-    // The credentialId identifies the credential if we're using a persistent
-    // credential store.
-    Credential credential =
-        new AuthorizationCodeInstalledApp(flow, new PromptReceiver())
-            .authorize(options.getCredentialId());
-
-    LOG.info("Got credential from client secret");
-    return credential;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java
deleted file mode 100644
index cfb120c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-
-/**
- * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
- */
-public class DataflowPathValidator implements PathValidator {
-
-  private DataflowPipelineOptions dataflowOptions;
-
-  DataflowPathValidator(DataflowPipelineOptions options) {
-    this.dataflowOptions = options;
-  }
-
-  public static DataflowPathValidator fromOptions(PipelineOptions options) {
-    return new DataflowPathValidator(options.as(DataflowPipelineOptions.class));
-  }
-
-  /**
-   * Validates the the input GCS path is accessible and that the path
-   * is well formed.
-   */
-  @Override
-  public String validateInputFilePatternSupported(String filepattern) {
-    GcsPath gcsPath = getGcsPath(filepattern);
-    Preconditions.checkArgument(
-        dataflowOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
-    String returnValue = verifyPath(filepattern);
-    verifyPathIsAccessible(filepattern, "Could not find file %s");
-    return returnValue;
-  }
-
-  /**
-   * Validates the the output GCS path is accessible and that the path
-   * is well formed.
-   */
-  @Override
-  public String validateOutputFilePrefixSupported(String filePrefix) {
-    String returnValue = verifyPath(filePrefix);
-    verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
-    return returnValue;
-  }
-
-  @Override
-  public String verifyPath(String path) {
-    GcsPath gcsPath = getGcsPath(path);
-    Preconditions.checkArgument(gcsPath.isAbsolute(),
-        "Must provide absolute paths for Dataflow");
-    Preconditions.checkArgument(!gcsPath.getObject().contains("//"),
-        "Dataflow Service does not allow objects with consecutive slashes");
-    return gcsPath.toResourceName();
-  }
-
-  private void verifyPathIsAccessible(String path, String errorMessage) {
-    GcsPath gcsPath = getGcsPath(path);
-    try {
-      Preconditions.checkArgument(dataflowOptions.getGcsUtil().bucketExists(gcsPath),
-        errorMessage, path);
-    } catch (IOException e) {
-      throw new RuntimeException(
-          String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
-          e);
-    }
-  }
-
-  private GcsPath getGcsPath(String path) {
-    try {
-      return GcsPath.fromUri(path);
-    } catch (IllegalArgumentException e) {
-      throw new IllegalArgumentException(String.format(
-          "%s expected a valid 'gs://' path but was given '%s'",
-          dataflowOptions.getRunner().getSimpleName(), path), e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowReleaseInfo.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowReleaseInfo.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowReleaseInfo.java
deleted file mode 100644
index 39b3005..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowReleaseInfo.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.json.GenericJson;
-import com.google.api.client.util.Key;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Properties;
-
-/**
- * Utilities for working with the Dataflow distribution.
- */
-public final class DataflowReleaseInfo extends GenericJson {
-  private static final Logger LOG = LoggerFactory.getLogger(DataflowReleaseInfo.class);
-
-  private static final String DATAFLOW_PROPERTIES_PATH =
-      "/com/google/cloud/dataflow/sdk/sdk.properties";
-
-  private static class LazyInit {
-    private static final DataflowReleaseInfo INSTANCE =
-        new DataflowReleaseInfo(DATAFLOW_PROPERTIES_PATH);
-  }
-
-  /**
-   * Returns an instance of DataflowReleaseInfo.
-   */
-  public static DataflowReleaseInfo getReleaseInfo() {
-    return LazyInit.INSTANCE;
-  }
-
-  @Key private String name = "Google Cloud Dataflow Java SDK";
-  @Key private String version = "Unknown";
-
-  /** Provides the SDK name. */
-  public String getName() {
-    return name;
-  }
-
-  /** Provides the SDK version. */
-  public String getVersion() {
-    return version;
-  }
-
-  private DataflowReleaseInfo(String resourcePath) {
-    Properties properties = new Properties();
-
-    InputStream in = DataflowReleaseInfo.class.getResourceAsStream(
-        DATAFLOW_PROPERTIES_PATH);
-    if (in == null) {
-      LOG.warn("Dataflow properties resource not found: {}", resourcePath);
-      return;
-    }
-
-    try {
-      properties.load(in);
-    } catch (IOException e) {
-      LOG.warn("Error loading Dataflow properties resource: ", e);
-    }
-
-    for (String name : properties.stringPropertyNames()) {
-      if (name.equals("name")) {
-        // We don't allow the properties to override the SDK name.
-        continue;
-      }
-      put(name, properties.getProperty(name));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DirectModeExecutionContext.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DirectModeExecutionContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DirectModeExecutionContext.java
deleted file mode 100644
index 6e97053..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DirectModeExecutionContext.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.ValueWithMetadata;
-import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
-import com.google.cloud.dataflow.sdk.util.state.InMemoryStateInternals;
-import com.google.cloud.dataflow.sdk.util.state.StateInternals;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * {@link ExecutionContext} for use in direct mode.
- */
-public class DirectModeExecutionContext
-    extends BaseExecutionContext<DirectModeExecutionContext.StepContext> {
-
-  private Object key;
-  private List<ValueWithMetadata<?>> output = Lists.newArrayList();
-  private Map<TupleTag<?>, List<ValueWithMetadata<?>>> sideOutputs = Maps.newHashMap();
-
-  protected DirectModeExecutionContext() {}
-
-  public static DirectModeExecutionContext create() {
-    return new DirectModeExecutionContext();
-  }
-
-  @Override
-  protected StepContext createStepContext(
-      String stepName, String transformName, StateSampler stateSampler) {
-    return new StepContext(this, stepName, transformName);
-  }
-
-  public Object getKey() {
-    return key;
-  }
-
-  public void setKey(Object newKey) {
-    // The direct mode runner may reorder elements, so we need to keep
-    // around the state used for each key.
-    for (ExecutionContext.StepContext stepContext : getAllStepContexts()) {
-      ((StepContext) stepContext).switchKey(newKey);
-    }
-    key = newKey;
-  }
-
-  @Override
-  public void noteOutput(WindowedValue<?> outputElem) {
-    output.add(ValueWithMetadata.of(outputElem).withKey(getKey()));
-  }
-
-  @Override
-  public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> outputElem) {
-    List<ValueWithMetadata<?>> output = sideOutputs.get(tag);
-    if (output == null) {
-      output = Lists.newArrayList();
-      sideOutputs.put(tag, output);
-    }
-    output.add(ValueWithMetadata.of(outputElem).withKey(getKey()));
-  }
-
-  public <T> List<ValueWithMetadata<T>> getOutput(@SuppressWarnings("unused") TupleTag<T> tag) {
-    @SuppressWarnings({"unchecked", "rawtypes"}) // Cast not expressible without rawtypes
-    List<ValueWithMetadata<T>> typedOutput = (List) output;
-    return typedOutput;
-  }
-
-  public <T> List<ValueWithMetadata<T>> getSideOutput(TupleTag<T> tag) {
-    if (sideOutputs.containsKey(tag)) {
-      @SuppressWarnings({"unchecked", "rawtypes"}) // Cast not expressible without rawtypes
-      List<ValueWithMetadata<T>> typedOutput = (List) sideOutputs.get(tag);
-      return typedOutput;
-    } else {
-      return Lists.newArrayList();
-    }
-  }
-
-  /**
-   * {@link ExecutionContext.StepContext} used in direct mode.
-   */
-  public static class StepContext extends BaseExecutionContext.StepContext {
-
-    /** A map from each key to the state associated with it. */
-    private final Map<Object, InMemoryStateInternals<Object>> stateInternals = Maps.newHashMap();
-    private InMemoryStateInternals<Object> currentStateInternals = null;
-
-    private StepContext(ExecutionContext executionContext, String stepName, String transformName) {
-      super(executionContext, stepName, transformName);
-      switchKey(null);
-    }
-
-    public void switchKey(Object newKey) {
-      currentStateInternals = stateInternals.get(newKey);
-      if (currentStateInternals == null) {
-        currentStateInternals = InMemoryStateInternals.forKey(newKey);
-        stateInternals.put(newKey, currentStateInternals);
-      }
-    }
-
-    @Override
-    public StateInternals<Object> stateInternals() {
-      return checkNotNull(currentStateInternals);
-    }
-
-    @Override
-    public TimerInternals timerInternals() {
-      throw new UnsupportedOperationException("Direct mode cannot return timerInternals");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DirectSideInputReader.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DirectSideInputReader.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DirectSideInputReader.java
deleted file mode 100644
index ee8c922..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DirectSideInputReader.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-
-/**
- * Basic side input reader wrapping a {@link PTuple} of side input iterables. Encapsulates
- * conversion according to the {@link PCollectionView} and projection to a particular
- * window.
- */
-public class DirectSideInputReader implements SideInputReader {
-
-  private PTuple sideInputValues;
-
-  private DirectSideInputReader(PTuple sideInputValues) {
-    this.sideInputValues = sideInputValues;
-  }
-
-  public static DirectSideInputReader of(PTuple sideInputValues) {
-    return new DirectSideInputReader(sideInputValues);
-  }
-
-  @Override
-  public <T> boolean contains(PCollectionView<T> view) {
-    return sideInputValues.has(view.getTagInternal());
-  }
-
-  @Override
-  public boolean isEmpty() {
-    return sideInputValues.isEmpty();
-  }
-
-  @Override
-  public <T> T get(PCollectionView<T> view, final BoundedWindow window) {
-    final TupleTag<Iterable<WindowedValue<?>>> tag = view.getTagInternal();
-    if (!sideInputValues.has(tag)) {
-      throw new IllegalArgumentException("calling getSideInput() with unknown view");
-    }
-
-    if (view.getWindowingStrategyInternal().getWindowFn() instanceof GlobalWindows) {
-      return view.fromIterableInternal(sideInputValues.get(tag));
-    } else {
-      return view.fromIterableInternal(
-          Iterables.filter(sideInputValues.get(tag),
-              new Predicate<WindowedValue<?>>() {
-                  @Override
-                  public boolean apply(WindowedValue<?> element) {
-                    return element.getWindows().contains(window);
-                  }
-                }));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnInfo.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnInfo.java
deleted file mode 100644
index 15a3a47..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnInfo.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-
-import java.io.Serializable;
-
-/**
- * Wrapper class holding the necessary information to serialize a DoFn.
- *
- * @param <InputT> the type of the (main) input elements of the DoFn
- * @param <OutputT> the type of the (main) output elements of the DoFn
- */
-public class DoFnInfo<InputT, OutputT> implements Serializable {
-  private final DoFn<InputT, OutputT> doFn;
-  private final WindowingStrategy<?, ?> windowingStrategy;
-  private final Iterable<PCollectionView<?>> sideInputViews;
-  private final Coder<InputT> inputCoder;
-
-  public DoFnInfo(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy) {
-    this.doFn = doFn;
-    this.windowingStrategy = windowingStrategy;
-    this.sideInputViews = null;
-    this.inputCoder = null;
-  }
-
-  public DoFnInfo(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy,
-                  Iterable<PCollectionView<?>> sideInputViews, Coder<InputT> inputCoder) {
-    this.doFn = doFn;
-    this.windowingStrategy = windowingStrategy;
-    this.sideInputViews = sideInputViews;
-    this.inputCoder = inputCoder;
-  }
-
-  public DoFn<InputT, OutputT> getDoFn() {
-    return doFn;
-  }
-
-  public WindowingStrategy<?, ?> getWindowingStrategy() {
-    return windowingStrategy;
-  }
-
-  public Iterable<PCollectionView<?>> getSideInputViews() {
-    return sideInputViews;
-  }
-
-  public Coder<InputT> getInputCoder() {
-    return inputCoder;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunner.java
deleted file mode 100644
index 51c3f39..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunner.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.DoFn.ProcessContext;
-import com.google.cloud.dataflow.sdk.values.KV;
-
-/**
- * An wrapper interface that represents the execution of a {@link DoFn}.
- */
-public interface DoFnRunner<InputT, OutputT> {
-  /**
-   * Prepares and calls {@link DoFn#startBundle}.
-   */
-  public void startBundle();
-
-  /**
-   * Calls {@link DoFn#processElement} with a {@link ProcessContext} containing the current element.
-   */
-  public void processElement(WindowedValue<InputT> elem);
-
-  /**
-   * Calls {@link DoFn#finishBundle} and performs additional tasks, such as
-   * flushing in-memory states.
-   */
-  public void finishBundle();
-
-  /**
-   * An internal interface for signaling that a {@link DoFn} requires late data dropping.
-   */
-  public interface ReduceFnExecutor<K, InputT, OutputT, W> {
-    /**
-     * Gets this object as a {@link DoFn}.
-     *
-     * Most implementors of this interface are expected to be {@link DoFn} instances, and will
-     * return themselves.
-     */
-    DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn();
-
-    /**
-     * Returns an aggregator that tracks elements that are dropped due to being late.
-     */
-    Aggregator<Long, Long> getDroppedDueToLatenessAggregator();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunnerBase.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunnerBase.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunnerBase.java
deleted file mode 100644
index 04ec59f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunnerBase.java
+++ /dev/null
@@ -1,558 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.IterableCoder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.util.DoFnRunners.OutputManager;
-import com.google.cloud.dataflow.sdk.util.ExecutionContext.StepContext;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet;
-import com.google.cloud.dataflow.sdk.util.state.StateInternals;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import org.joda.time.Instant;
-import org.joda.time.format.PeriodFormat;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A base implementation of {@link DoFnRunner}.
- *
- * <p> Sub-classes should override {@link #invokeProcessElement}.
- */
-public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
-
-  /** The DoFn being run. */
-  public final DoFn<InputT, OutputT> fn;
-
-  /** The context used for running the DoFn. */
-  public final DoFnContext<InputT, OutputT> context;
-
-  protected DoFnRunnerBase(
-      PipelineOptions options,
-      DoFn<InputT, OutputT> fn,
-      SideInputReader sideInputReader,
-      OutputManager outputManager,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      StepContext stepContext,
-      CounterSet.AddCounterMutator addCounterMutator,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    this.fn = fn;
-    this.context = new DoFnContext<>(
-        options,
-        fn,
-        sideInputReader,
-        outputManager,
-        mainOutputTag,
-        sideOutputTags,
-        stepContext,
-        addCounterMutator,
-        windowingStrategy == null ? null : windowingStrategy.getWindowFn());
-  }
-
-  /**
-   * An implementation of {@code OutputManager} using simple lists, for testing and in-memory
-   * contexts such as the {@link DirectPipelineRunner}.
-   */
-  public static class ListOutputManager implements OutputManager {
-
-    private Map<TupleTag<?>, List<WindowedValue<?>>> outputLists = Maps.newHashMap();
-
-    @Override
-    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      List<WindowedValue<T>> outputList = (List) outputLists.get(tag);
-
-      if (outputList == null) {
-        outputList = Lists.newArrayList();
-        @SuppressWarnings({"rawtypes", "unchecked"})
-        List<WindowedValue<?>> untypedList = (List) outputList;
-        outputLists.put(tag, untypedList);
-      }
-
-      outputList.add(output);
-    }
-
-    public <T> List<WindowedValue<T>> getOutput(TupleTag<T> tag) {
-      // Safe cast by design, inexpressible in Java without rawtypes
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      List<WindowedValue<T>> outputList = (List) outputLists.get(tag);
-      return (outputList != null) ? outputList : Collections.<WindowedValue<T>>emptyList();
-    }
-  }
-
-  @Override
-  public void startBundle() {
-    // This can contain user code. Wrap it in case it throws an exception.
-    try {
-      fn.startBundle(context);
-    } catch (Throwable t) {
-      // Exception in user code.
-      throw wrapUserCodeException(t);
-    }
-  }
-
-  @Override
-  public void processElement(WindowedValue<InputT> elem) {
-    if (elem.getWindows().size() <= 1
-        || (!RequiresWindowAccess.class.isAssignableFrom(fn.getClass())
-            && context.sideInputReader.isEmpty())) {
-      invokeProcessElement(elem);
-    } else {
-      // We could modify the windowed value (and the processContext) to
-      // avoid repeated allocations, but this is more straightforward.
-      for (BoundedWindow window : elem.getWindows()) {
-        invokeProcessElement(WindowedValue.of(
-            elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
-      }
-    }
-  }
-
-  /**
-   * Invokes {@link DoFn#processElement} after certain pre-processings has been done in
-   * {@link DoFnRunnerBase#processElement}.
-   */
-  protected abstract void invokeProcessElement(WindowedValue<InputT> elem);
-
-  @Override
-  public void finishBundle() {
-    // This can contain user code. Wrap it in case it throws an exception.
-    try {
-      fn.finishBundle(context);
-    } catch (Throwable t) {
-      // Exception in user code.
-      throw wrapUserCodeException(t);
-    }
-  }
-
-  /**
-   * A concrete implementation of {@code DoFn.Context} used for running a {@link DoFn}.
-   *
-   * @param <InputT> the type of the DoFn's (main) input elements
-   * @param <OutputT> the type of the DoFn's (main) output elements
-   */
-  private static class DoFnContext<InputT, OutputT>
-      extends DoFn<InputT, OutputT>.Context {
-    private static final int MAX_SIDE_OUTPUTS = 1000;
-
-    final PipelineOptions options;
-    final DoFn<InputT, OutputT> fn;
-    final SideInputReader sideInputReader;
-    final OutputManager outputManager;
-    final TupleTag<OutputT> mainOutputTag;
-    final StepContext stepContext;
-    final CounterSet.AddCounterMutator addCounterMutator;
-    final WindowFn<?, ?> windowFn;
-
-    /**
-     * The set of known output tags, some of which may be undeclared, so we can throw an
-     * exception when it exceeds {@link #MAX_SIDE_OUTPUTS}.
-     */
-    private Set<TupleTag<?>> outputTags;
-
-    public DoFnContext(PipelineOptions options,
-                       DoFn<InputT, OutputT> fn,
-                       SideInputReader sideInputReader,
-                       OutputManager outputManager,
-                       TupleTag<OutputT> mainOutputTag,
-                       List<TupleTag<?>> sideOutputTags,
-                       StepContext stepContext,
-                       CounterSet.AddCounterMutator addCounterMutator,
-                       WindowFn<?, ?> windowFn) {
-      fn.super();
-      this.options = options;
-      this.fn = fn;
-      this.sideInputReader = sideInputReader;
-      this.outputManager = outputManager;
-      this.mainOutputTag = mainOutputTag;
-      this.outputTags = Sets.newHashSet();
-
-      outputTags.add(mainOutputTag);
-      for (TupleTag<?> sideOutputTag : sideOutputTags) {
-        outputTags.add(sideOutputTag);
-      }
-
-      this.stepContext = stepContext;
-      this.addCounterMutator = addCounterMutator;
-      this.windowFn = windowFn;
-      super.setupDelegateAggregators();
-    }
-
-    //////////////////////////////////////////////////////////////////////////////
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return options;
-    }
-
-    <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue(
-        T output, Instant timestamp, Collection<W> windows, PaneInfo pane) {
-      final Instant inputTimestamp = timestamp;
-
-      if (timestamp == null) {
-        timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
-      }
-
-      if (windows == null) {
-        try {
-          // The windowFn can never succeed at accessing the element, so its type does not
-          // matter here
-          @SuppressWarnings("unchecked")
-          WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn;
-          windows = objectWindowFn.assignWindows(objectWindowFn.new AssignContext() {
-            @Override
-            public Object element() {
-              throw new UnsupportedOperationException(
-                  "WindowFn attempted to access input element when none was available");
-            }
-
-            @Override
-            public Instant timestamp() {
-              if (inputTimestamp == null) {
-                throw new UnsupportedOperationException(
-                    "WindowFn attempted to access input timestamp when none was available");
-              }
-              return inputTimestamp;
-            }
-
-            @Override
-            public Collection<? extends BoundedWindow> windows() {
-              throw new UnsupportedOperationException(
-                  "WindowFn attempted to access input windows when none were available");
-            }
-          });
-        } catch (Exception e) {
-          throw UserCodeException.wrap(e);
-        }
-      }
-
-      return WindowedValue.of(output, timestamp, windows, pane);
-    }
-
-    public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-      if (!sideInputReader.contains(view)) {
-        throw new IllegalArgumentException("calling sideInput() with unknown view");
-      }
-      BoundedWindow sideInputWindow =
-          view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
-      return sideInputReader.get(view, sideInputWindow);
-    }
-
-    void outputWindowedValue(
-        OutputT output,
-        Instant timestamp,
-        Collection<? extends BoundedWindow> windows,
-        PaneInfo pane) {
-      outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane));
-    }
-
-    void outputWindowedValue(WindowedValue<OutputT> windowedElem) {
-      outputManager.output(mainOutputTag, windowedElem);
-      if (stepContext != null) {
-        stepContext.noteOutput(windowedElem);
-      }
-    }
-
-    protected <T> void sideOutputWindowedValue(TupleTag<T> tag,
-                                               T output,
-                                               Instant timestamp,
-                                               Collection<? extends BoundedWindow> windows,
-                                               PaneInfo pane) {
-      sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane));
-    }
-
-    protected <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
-      if (!outputTags.contains(tag)) {
-        // This tag wasn't declared nor was it seen before during this execution.
-        // Thus, this must be a new, undeclared and unconsumed output.
-        // To prevent likely user errors, enforce the limit on the number of side
-        // outputs.
-        if (outputTags.size() >= MAX_SIDE_OUTPUTS) {
-          throw new IllegalArgumentException(
-              "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS);
-        }
-        outputTags.add(tag);
-      }
-
-      outputManager.output(tag, windowedElem);
-      if (stepContext != null) {
-        stepContext.noteSideOutput(tag, windowedElem);
-      }
-    }
-
-    // Following implementations of output, outputWithTimestamp, and sideOutput
-    // are only accessible in DoFn.startBundle and DoFn.finishBundle, and will be shadowed by
-    // ProcessContext's versions in DoFn.processElement.
-    @Override
-    public void output(OutputT output) {
-      outputWindowedValue(output, null, null, PaneInfo.NO_FIRING);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      Preconditions.checkNotNull(tag, "TupleTag passed to sideOutput cannot be null");
-      sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      Preconditions.checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null");
-      sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
-    }
-
-    private String generateInternalAggregatorName(String userName) {
-      boolean system = fn.getClass().isAnnotationPresent(SystemDoFnInternal.class);
-      return (system ? "" : "user-") + stepContext.getStepName() + "-" + userName;
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
-        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      Preconditions.checkNotNull(combiner,
-          "Combiner passed to createAggregator cannot be null");
-      return new CounterAggregator<>(generateInternalAggregatorName(name),
-          combiner, addCounterMutator);
-    }
-  }
-
-  /**
-   * Returns a new {@code DoFn.ProcessContext} for the given element.
-   */
-  protected DoFn<InputT, OutputT>.ProcessContext createProcessContext(WindowedValue<InputT> elem) {
-    return new DoFnProcessContext<InputT, OutputT>(fn, context, elem);
-  }
-
-  protected RuntimeException wrapUserCodeException(Throwable t) {
-    throw UserCodeException.wrapIf(!isSystemDoFn(), t);
-  }
-
-  private boolean isSystemDoFn() {
-    return fn.getClass().isAnnotationPresent(SystemDoFnInternal.class);
-  }
-
-  /**
-   * A concrete implementation of {@code DoFn.ProcessContext} used for
-   * running a {@link DoFn} over a single element.
-   *
-   * @param <InputT> the type of the DoFn's (main) input elements
-   * @param <OutputT> the type of the DoFn's (main) output elements
-   */
-  static class DoFnProcessContext<InputT, OutputT>
-      extends DoFn<InputT, OutputT>.ProcessContext {
-
-
-    final DoFn<InputT, OutputT> fn;
-    final DoFnContext<InputT, OutputT> context;
-    final WindowedValue<InputT> windowedValue;
-
-    public DoFnProcessContext(DoFn<InputT, OutputT> fn,
-                              DoFnContext<InputT, OutputT> context,
-                              WindowedValue<InputT> windowedValue) {
-      fn.super();
-      this.fn = fn;
-      this.context = context;
-      this.windowedValue = windowedValue;
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
-    }
-
-    @Override
-    public InputT element() {
-      return windowedValue.getValue();
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      Preconditions.checkNotNull(view, "View passed to sideInput cannot be null");
-      Iterator<? extends BoundedWindow> windowIter = windows().iterator();
-      BoundedWindow window;
-      if (!windowIter.hasNext()) {
-        if (context.windowFn instanceof GlobalWindows) {
-          // TODO: Remove this once GroupByKeyOnly no longer outputs elements
-          // without windows
-          window = GlobalWindow.INSTANCE;
-        } else {
-          throw new IllegalStateException(
-              "sideInput called when main input element is not in any windows");
-        }
-      } else {
-        window = windowIter.next();
-        if (windowIter.hasNext()) {
-          throw new IllegalStateException(
-              "sideInput called when main input element is in multiple windows");
-        }
-      }
-      return context.sideInput(view, window);
-    }
-
-    @Override
-    public BoundedWindow window() {
-      if (!(fn instanceof RequiresWindowAccess)) {
-        throw new UnsupportedOperationException(
-            "window() is only available in the context of a DoFn marked as RequiresWindow.");
-      }
-      return Iterables.getOnlyElement(windows());
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return windowedValue.getPane();
-    }
-
-    @Override
-    public void output(OutputT output) {
-      context.outputWindowedValue(windowedValue.withValue(output));
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      checkTimestamp(timestamp);
-      context.outputWindowedValue(output, timestamp,
-          windowedValue.getWindows(), windowedValue.getPane());
-    }
-
-    void outputWindowedValue(
-        OutputT output,
-        Instant timestamp,
-        Collection<? extends BoundedWindow> windows,
-        PaneInfo pane) {
-      context.outputWindowedValue(output, timestamp, windows, pane);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      Preconditions.checkNotNull(tag, "Tag passed to sideOutput cannot be null");
-      context.sideOutputWindowedValue(tag, windowedValue.withValue(output));
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      Preconditions.checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null");
-      checkTimestamp(timestamp);
-      context.sideOutputWindowedValue(
-          tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane());
-    }
-
-    @Override
-    public Instant timestamp() {
-      return windowedValue.getTimestamp();
-    }
-
-    public Collection<? extends BoundedWindow> windows() {
-      return windowedValue.getWindows();
-    }
-
-    private void checkTimestamp(Instant timestamp) {
-      if (timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) {
-        throw new IllegalArgumentException(String.format(
-            "Cannot output with timestamp %s. Output timestamps must be no earlier than the "
-            + "timestamp of the current input (%s) minus the allowed skew (%s). See the "
-            + "DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.",
-            timestamp, windowedValue.getTimestamp(),
-            PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod())));
-      }
-    }
-
-    @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      return new WindowingInternals<InputT, OutputT>() {
-        @Override
-        public void outputWindowedValue(OutputT output, Instant timestamp,
-            Collection<? extends BoundedWindow> windows, PaneInfo pane) {
-          context.outputWindowedValue(output, timestamp, windows, pane);
-        }
-
-        @Override
-        public Collection<? extends BoundedWindow> windows() {
-          return windowedValue.getWindows();
-        }
-
-        @Override
-        public PaneInfo pane() {
-          return windowedValue.getPane();
-        }
-
-        @Override
-        public TimerInternals timerInternals() {
-          return context.stepContext.timerInternals();
-        }
-
-        @Override
-        public <T> void writePCollectionViewData(
-            TupleTag<?> tag,
-            Iterable<WindowedValue<T>> data,
-            Coder<T> elemCoder) throws IOException {
-          @SuppressWarnings("unchecked")
-          Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) context.windowFn.windowCoder();
-
-          context.stepContext.writePCollectionViewData(
-              tag, data, IterableCoder.of(WindowedValue.getFullCoder(elemCoder, windowCoder)),
-              window(), windowCoder);
-        }
-
-        @Override
-        public StateInternals<?> stateInternals() {
-          return context.stepContext.stateInternals();
-        }
-
-        @Override
-        public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-          return context.sideInput(view, mainInputWindow);
-        }
-      };
-    }
-
-    @Override
-    protected <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT>
-        createAggregatorInternal(
-            String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) {
-      return context.createAggregatorInternal(name, combiner);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunners.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunners.java
deleted file mode 100644
index d56b36e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunners.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.DoFnRunner.ReduceFnExecutor;
-import com.google.cloud.dataflow.sdk.util.ExecutionContext.StepContext;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet.AddCounterMutator;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-
-import java.util.List;
-
-/**
- * Static utility methods that provide {@link DoFnRunner} implementations.
- */
-public class DoFnRunners {
-  /**
-   * Information about how to create output receivers and output to them.
-   */
-  public interface OutputManager {
-    /**
-     * Outputs a single element to the receiver indicated by the given {@link TupleTag}.
-     */
-    public <T> void output(TupleTag<T> tag, WindowedValue<T> output);
-  }
-
-  /**
-   * Returns a basic implementation of {@link DoFnRunner} that works for most {@link DoFn DoFns}.
-   *
-   * <p>It invokes {@link DoFn#processElement} for each input.
-   */
-  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
-      PipelineOptions options,
-      DoFn<InputT, OutputT> fn,
-      SideInputReader sideInputReader,
-      OutputManager outputManager,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      StepContext stepContext,
-      CounterSet.AddCounterMutator addCounterMutator,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    return new SimpleDoFnRunner<>(
-        options,
-        fn,
-        sideInputReader,
-        outputManager,
-        mainOutputTag,
-        sideOutputTags,
-        stepContext,
-        addCounterMutator,
-        windowingStrategy);
-  }
-
-  /**
-   * Returns an implementation of {@link DoFnRunner} that handles late data dropping.
-   *
-   * <p>It drops elements from expired windows before they reach the underlying {@link DoFn}.
-   */
-  public static <K, InputT, OutputT, W extends BoundedWindow>
-      DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(
-          PipelineOptions options,
-          ReduceFnExecutor<K, InputT, OutputT, W> reduceFnExecutor,
-          SideInputReader sideInputReader,
-          OutputManager outputManager,
-          TupleTag<KV<K, OutputT>> mainOutputTag,
-          List<TupleTag<?>> sideOutputTags,
-          StepContext stepContext,
-          CounterSet.AddCounterMutator addCounterMutator,
-          WindowingStrategy<?, W> windowingStrategy) {
-    DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> simpleDoFnRunner =
-        simpleRunner(
-            options,
-            reduceFnExecutor.asDoFn(),
-            sideInputReader,
-            outputManager,
-            mainOutputTag,
-            sideOutputTags,
-            stepContext,
-            addCounterMutator,
-            windowingStrategy);
-    return new LateDataDroppingDoFnRunner<>(
-        simpleDoFnRunner,
-        windowingStrategy,
-        stepContext.timerInternals(),
-        reduceFnExecutor.getDroppedDueToLatenessAggregator());
-  }
-
-  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
-      PipelineOptions options,
-      DoFn<InputT, OutputT> doFn,
-      SideInputReader sideInputReader,
-      OutputManager outputManager,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      StepContext stepContext,
-      AddCounterMutator addCounterMutator,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    if (doFn instanceof ReduceFnExecutor) {
-      @SuppressWarnings("rawtypes")
-      ReduceFnExecutor fn = (ReduceFnExecutor) doFn;
-      @SuppressWarnings({"unchecked", "cast", "rawtypes"})
-      DoFnRunner<InputT, OutputT> runner = (DoFnRunner<InputT, OutputT>) lateDataDroppingRunner(
-          options,
-          fn,
-          sideInputReader,
-          outputManager,
-          (TupleTag) mainOutputTag,
-          sideOutputTags,
-          stepContext,
-          addCounterMutator,
-          (WindowingStrategy) windowingStrategy);
-      return runner;
-    }
-    return simpleRunner(
-        options,
-        doFn,
-        sideInputReader,
-        outputManager,
-        mainOutputTag,
-        sideOutputTags,
-        stepContext,
-        addCounterMutator,
-        windowingStrategy);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExecutableTrigger.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExecutableTrigger.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExecutableTrigger.java
deleted file mode 100644
index 22a3762..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExecutableTrigger.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger.OnceTrigger;
-import com.google.common.base.Preconditions;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A wrapper around a trigger used during execution. While an actual trigger may appear multiple
- * times (both in the same trigger expression and in other trigger expressions), the
- * {@code ExecutableTrigger} wrapped around them forms a tree (only one occurrence).
- *
- * @param <W> {@link BoundedWindow} subclass used to represent the windows used.
- */
-public class ExecutableTrigger<W extends BoundedWindow> implements Serializable {
-
-  /** Store the index assigned to this trigger. */
-  private final int triggerIndex;
-  private final int firstIndexAfterSubtree;
-  private final List<ExecutableTrigger<W>> subTriggers = new ArrayList<>();
-  private final Trigger<W> trigger;
-
-  public static <W extends BoundedWindow> ExecutableTrigger<W> create(Trigger<W> trigger) {
-    return create(trigger, 0);
-  }
-
-  private static <W extends BoundedWindow> ExecutableTrigger<W> create(
-      Trigger<W> trigger, int nextUnusedIndex) {
-    if (trigger instanceof OnceTrigger) {
-      return new ExecutableOnceTrigger<W>((OnceTrigger<W>) trigger, nextUnusedIndex);
-    } else {
-      return new ExecutableTrigger<W>(trigger, nextUnusedIndex);
-    }
-  }
-
-  public static <W extends BoundedWindow> ExecutableTrigger<W> createForOnceTrigger(
-      OnceTrigger<W> trigger, int nextUnusedIndex) {
-    return new ExecutableOnceTrigger<W>(trigger, nextUnusedIndex);
-  }
-
-  private ExecutableTrigger(Trigger<W> trigger, int nextUnusedIndex) {
-    this.trigger = Preconditions.checkNotNull(trigger, "trigger must not be null");
-    this.triggerIndex = nextUnusedIndex++;
-
-    if (trigger.subTriggers() != null) {
-      for (Trigger<W> subTrigger : trigger.subTriggers()) {
-        ExecutableTrigger<W> subExecutable = create(subTrigger, nextUnusedIndex);
-        subTriggers.add(subExecutable);
-        nextUnusedIndex = subExecutable.firstIndexAfterSubtree;
-      }
-    }
-    firstIndexAfterSubtree = nextUnusedIndex;
-  }
-
-  public List<ExecutableTrigger<W>> subTriggers() {
-    return subTriggers;
-  }
-
-  @Override
-  public String toString() {
-    return trigger.toString();
-  }
-
-  /**
-   * Return the underlying trigger specification corresponding to this {@code ExecutableTrigger}.
-   */
-  public Trigger<W> getSpec() {
-    return trigger;
-  }
-
-  public int getTriggerIndex() {
-    return triggerIndex;
-  }
-
-  public final int getFirstIndexAfterSubtree() {
-    return firstIndexAfterSubtree;
-  }
-
-  public boolean isCompatible(ExecutableTrigger<W> other) {
-    return trigger.isCompatible(other.trigger);
-  }
-
-  public ExecutableTrigger<W> getSubTriggerContaining(int index) {
-    Preconditions.checkNotNull(subTriggers);
-    Preconditions.checkState(index > triggerIndex && index < firstIndexAfterSubtree,
-        "Cannot find sub-trigger containing index not in this tree.");
-    ExecutableTrigger<W> previous = null;
-    for (ExecutableTrigger<W> subTrigger : subTriggers) {
-      if (index < subTrigger.triggerIndex) {
-        return previous;
-      }
-      previous = subTrigger;
-    }
-    return previous;
-  }
-
-  /**
-   * Invoke the {@link Trigger#onElement} method for this trigger, ensuring that the bits are
-   * properly updated if the trigger finishes.
-   */
-  public void invokeOnElement(Trigger<W>.OnElementContext c) throws Exception {
-    trigger.onElement(c.forTrigger(this));
-  }
-
-  /**
-   * Invoke the {@link Trigger#onMerge} method for this trigger, ensuring that the bits are properly
-   * updated.
-   */
-  public void invokeOnMerge(Trigger<W>.OnMergeContext c) throws Exception {
-    Trigger<W>.OnMergeContext subContext = c.forTrigger(this);
-    trigger.onMerge(subContext);
-  }
-
-  public boolean invokeShouldFire(Trigger<W>.TriggerContext c) throws Exception {
-    return trigger.shouldFire(c.forTrigger(this));
-  }
-
-  public void invokeOnFire(Trigger<W>.TriggerContext c) throws Exception {
-    trigger.onFire(c.forTrigger(this));
-  }
-
-  /**
-   * Invoke clear for the current this trigger.
-   */
-  public void invokeClear(Trigger<W>.TriggerContext c) throws Exception {
-    trigger.clear(c.forTrigger(this));
-  }
-
-  /**
-   * {@link ExecutableTrigger} that enforces the fact that the trigger should always FIRE_AND_FINISH
-   * and never just FIRE.
-   */
-  private static class ExecutableOnceTrigger<W extends BoundedWindow> extends ExecutableTrigger<W> {
-
-    public ExecutableOnceTrigger(OnceTrigger<W> trigger, int nextUnusedIndex) {
-      super(trigger, nextUnusedIndex);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExecutionContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExecutionContext.java
deleted file mode 100644
index cff5b95..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExecutionContext.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
-import com.google.cloud.dataflow.sdk.util.state.StateInternals;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-
-import java.io.IOException;
-import java.util.Collection;
-
-/**
- * Context for the current execution. This is guaranteed to exist during processing,
- * but does not necessarily persist between different batches of work.
- */
-public interface ExecutionContext {
-  /**
-   * Returns the {@link StepContext} associated with the given step.
-   */
-  StepContext getOrCreateStepContext(
-      String stepName, String transformName, StateSampler stateSampler);
-
-  /**
-   * Returns a collection view of all of the {@link StepContext}s.
-   */
-  Collection<? extends StepContext> getAllStepContexts();
-
-  /**
-   * Hook for subclasses to implement that will be called whenever
-   * {@link com.google.cloud.dataflow.sdk.transforms.DoFn.Context#output}
-   * is called.
-   */
-  void noteOutput(WindowedValue<?> output);
-
-  /**
-   * Hook for subclasses to implement that will be called whenever
-   * {@link com.google.cloud.dataflow.sdk.transforms.DoFn.Context#sideOutput}
-   * is called.
-   */
-  void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output);
-
-  /**
-   * Per-step, per-key context used for retrieving state.
-   */
-  public interface StepContext {
-
-    /**
-     * The name of the step.
-     */
-    String getStepName();
-
-    /**
-     * The name of the transform for the step.
-     */
-    String getTransformName();
-
-    /**
-     * Hook for subclasses to implement that will be called whenever
-     * {@link com.google.cloud.dataflow.sdk.transforms.DoFn.Context#output}
-     * is called.
-     */
-    void noteOutput(WindowedValue<?> output);
-
-    /**
-     * Hook for subclasses to implement that will be called whenever
-     * {@link com.google.cloud.dataflow.sdk.transforms.DoFn.Context#sideOutput}
-     * is called.
-     */
-    void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output);
-
-    /**
-     * Writes the given {@code PCollectionView} data to a globally accessible location.
-     */
-    <T, W extends BoundedWindow> void writePCollectionViewData(
-        TupleTag<?> tag,
-        Iterable<WindowedValue<T>> data,
-        Coder<Iterable<WindowedValue<T>>> dataCoder,
-        W window,
-        Coder<W> windowCoder)
-            throws IOException;
-
-    StateInternals<?> stateInternals();
-
-    TimerInternals timerInternals();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExposedByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExposedByteArrayInputStream.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExposedByteArrayInputStream.java
deleted file mode 100644
index dff5fd1..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExposedByteArrayInputStream.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-
-/**
- * {@link ByteArrayInputStream} that allows accessing the entire internal buffer without copying.
- */
-public class ExposedByteArrayInputStream extends ByteArrayInputStream{
-
-  public ExposedByteArrayInputStream(byte[] buf) {
-    super(buf);
-  }
-
-  /** Read all remaining bytes.
-   * @throws IOException */
-  public byte[] readAll() throws IOException {
-    if (pos == 0 && count == buf.length) {
-      pos = count;
-      return buf;
-    }
-    byte[] ret = new byte[count - pos];
-    super.read(ret);
-    return ret;
-  }
-
-  @Override
-  public void close() {
-    try {
-      super.close();
-    } catch (IOException exn) {
-      throw new RuntimeException("Unexpected IOException closing ByteArrayInputStream", exn);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExposedByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExposedByteArrayOutputStream.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExposedByteArrayOutputStream.java
deleted file mode 100644
index d8e4d50..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExposedByteArrayOutputStream.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-/**
- * {@link ByteArrayOutputStream} special cased to treat writes of a single byte-array specially.
- * When calling {@link #toByteArray()} after writing only one {@code byte[]} using
- * {@link #writeAndOwn(byte[])}, it will return that array directly.
- */
-public class ExposedByteArrayOutputStream extends ByteArrayOutputStream {
-
-  private byte[] swappedBuffer;
-
-  /**
-   * If true, this stream doesn't allow direct access to the passed in byte-array. It behaves just
-   * like a normal {@link ByteArrayOutputStream}.
-   *
-   * <p>It is set to true after any write operations other than the first call to
-   * {@link #writeAndOwn(byte[])}.
-   */
-  private boolean isFallback = false;
-
-  /**
-   * Fall back to the behavior of a normal {@link ByteArrayOutputStream}.
-   */
-  private void fallback() {
-    isFallback = true;
-    if (swappedBuffer != null) {
-      // swappedBuffer != null means buf is actually provided by the caller of writeAndOwn(),
-      // while swappedBuffer is the original buffer.
-      // Recover the buffer and copy the bytes from buf.
-      byte[] tempBuffer = buf;
-      count = 0;
-      buf = swappedBuffer;
-      super.write(tempBuffer, 0, tempBuffer.length);
-      swappedBuffer = null;
-    }
-  }
-
-  /**
-   * Write {@code b} to the stream and take the ownership of {@code b}.
-   * If the stream is empty, {@code b} itself will be used as the content of the stream and
-   * no content copy will be involved.
-   * <p><i>Note: After passing any byte array to this method, it must not be modified again.</i>
-   *
-   * @throws IOException
-   */
-  public void writeAndOwn(byte[] b) throws IOException {
-    if (b.length == 0) {
-      return;
-    }
-    if (count == 0) {
-      // Optimized first-time whole write.
-      // The original buffer will be swapped to swappedBuffer, while the input b is used as buf.
-      swappedBuffer = buf;
-      buf = b;
-      count = b.length;
-    } else {
-      fallback();
-      super.write(b);
-    }
-  }
-
-  @Override
-  public void write(byte[] b, int off, int len) {
-    fallback();
-    super.write(b, off, len);
-  }
-
-  @Override
-  public void write(int b) {
-    fallback();
-    super.write(b);
-  }
-
-  @Override
-  public byte[] toByteArray() {
-    // Note: count == buf.length is not a correct criteria to "return buf;", because the internal
-    // buf may be reused after reset().
-    if (!isFallback && count > 0) {
-      return buf;
-    } else {
-      return super.toByteArray();
-    }
-  }
-
-  @Override
-  public void reset() {
-    if (count == 0) {
-      return;
-    }
-    count = 0;
-    if (isFallback) {
-      isFallback = false;
-    } else {
-      buf = swappedBuffer;
-      swappedBuffer = null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FileIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FileIOChannelFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FileIOChannelFactory.java
deleted file mode 100644
index 77d0b83..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FileIOChannelFactory.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterables;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.nio.file.FileSystems;
-import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
-import java.nio.file.PathMatcher;
-import java.nio.file.Paths;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.regex.Matcher;
-
-/**
- * Implements IOChannelFactory for local files.
- */
-public class FileIOChannelFactory implements IOChannelFactory {
-  private static final Logger LOG = LoggerFactory.getLogger(FileIOChannelFactory.class);
-
-  // This implementation only allows for wildcards in the file name.
-  // The directory portion must exist as-is.
-  @Override
-  public Collection<String> match(String spec) throws IOException {
-    File file = new File(spec);
-
-    File parent = file.getAbsoluteFile().getParentFile();
-    if (!parent.exists()) {
-      throw new IOException("Unable to find parent directory of " + spec);
-    }
-
-    // Method getAbsolutePath() on Windows platform may return something like
-    // "c:\temp\file.txt". FileSystem.getPathMatcher() call below will treat
-    // '\' (backslash) as an escape character, instead of a directory
-    // separator. Replacing backslash with double-backslash solves the problem.
-    // We perform the replacement on all platforms, even those that allow
-    // backslash as a part of the filename, because Globs.toRegexPattern will
-    // eat one backslash.
-    String pathToMatch = file.getAbsolutePath().replaceAll(Matcher.quoteReplacement("\\"),
-                                                           Matcher.quoteReplacement("\\\\"));
-
-    final PathMatcher matcher = FileSystems.getDefault().getPathMatcher("glob:" + pathToMatch);
-
-    Iterable<File> files = com.google.common.io.Files.fileTreeTraverser().preOrderTraversal(parent);
-    Iterable<File> matchedFiles = Iterables.filter(files,
-        Predicates.and(
-            com.google.common.io.Files.isFile(),
-            new Predicate<File>() {
-              @Override
-              public boolean apply(File input) {
-                return matcher.matches(input.toPath());
-              }
-        }));
-
-    List<String> result = new LinkedList<>();
-    for (File match : matchedFiles) {
-      result.add(match.getPath());
-    }
-
-    return result;
-  }
-
-  @Override
-  public ReadableByteChannel open(String spec) throws IOException {
-    LOG.debug("opening file {}", spec);
-    @SuppressWarnings("resource") // The caller is responsible for closing the channel.
-    FileInputStream inputStream = new FileInputStream(spec);
-    // Use this method for creating the channel (rather than new FileChannel) so that we get
-    // regular FileNotFoundException. Closing the underyling channel will close the inputStream.
-    return inputStream.getChannel();
-  }
-
-  @Override
-  public WritableByteChannel create(String spec, String mimeType)
-      throws IOException {
-    LOG.debug("creating file {}", spec);
-    File file = new File(spec);
-    if (file.getAbsoluteFile().getParentFile() != null
-        && !file.getAbsoluteFile().getParentFile().exists()
-        && !file.getAbsoluteFile().getParentFile().mkdirs()) {
-      throw new IOException("Unable to create parent directories for '" + spec + "'");
-    }
-    return Channels.newChannel(
-        new BufferedOutputStream(new FileOutputStream(file)));
-  }
-
-  @Override
-  public long getSizeBytes(String spec) throws IOException {
-    try {
-      return Files.size(FileSystems.getDefault().getPath(spec));
-    } catch (NoSuchFileException e) {
-      throw new FileNotFoundException(e.getReason());
-    }
-  }
-
-  @Override
-  public boolean isReadSeekEfficient(String spec) throws IOException {
-    return true;
-  }
-
-  @Override
-  public String resolve(String path, String other) throws IOException {
-    return Paths.get(path).resolve(other).toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggers.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggers.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggers.java
deleted file mode 100644
index e75be23..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggers.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-/**
- * A mutable set which tracks whether any particular {@link ExecutableTrigger} is
- * finished.
- */
-public interface FinishedTriggers {
-  /**
-   * Returns {@code true} if the trigger is finished.
-   */
-  public boolean isFinished(ExecutableTrigger<?> trigger);
-
-  /**
-   * Sets the fact that the trigger is finished.
-   */
-  public void setFinished(ExecutableTrigger<?> trigger, boolean value);
-
-  /**
-   * Sets the trigger and all of its subtriggers to unfinished.
-   */
-  public void clearRecursively(ExecutableTrigger<?> trigger);
-
-  /**
-   * Create an independent copy of this mutable {@link FinishedTriggers}.
-   */
-  public FinishedTriggers copy();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersBitSet.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersBitSet.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersBitSet.java
deleted file mode 100644
index 09f7af7..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersBitSet.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import java.util.BitSet;
-
-/**
- * A {@link FinishedTriggers} implementation based on an underlying {@link BitSet}.
- */
-public class FinishedTriggersBitSet implements FinishedTriggers {
-
-  private final BitSet bitSet;
-
-  private FinishedTriggersBitSet(BitSet bitSet) {
-    this.bitSet = bitSet;
-  }
-
-  public static FinishedTriggersBitSet emptyWithCapacity(int capacity) {
-    return new FinishedTriggersBitSet(new BitSet(capacity));
-  }
-
-  public static FinishedTriggersBitSet fromBitSet(BitSet bitSet) {
-    return new FinishedTriggersBitSet(bitSet);
-  }
-
-  /**
-   * Returns the underlying {@link BitSet} for this {@link FinishedTriggersBitSet}.
-   */
-  public BitSet getBitSet() {
-    return bitSet;
-  }
-
-  @Override
-  public boolean isFinished(ExecutableTrigger<?> trigger) {
-    return bitSet.get(trigger.getTriggerIndex());
-  }
-
-  @Override
-  public void setFinished(ExecutableTrigger<?> trigger, boolean value) {
-    bitSet.set(trigger.getTriggerIndex(), value);
-  }
-
-  @Override
-  public void clearRecursively(ExecutableTrigger<?> trigger) {
-    bitSet.clear(trigger.getTriggerIndex(), trigger.getFirstIndexAfterSubtree());
-  }
-
-  @Override
-  public FinishedTriggersBitSet copy() {
-    return new FinishedTriggersBitSet((BitSet) bitSet.clone());
-  }
-}
-
-