You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:35 UTC

[11/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
deleted file mode 100644
index 96629b1..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
+++ /dev/null
@@ -1,544 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.MapCoder;
-import com.google.cloud.dataflow.sdk.coders.SetCoder;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.util.state.StateInternals;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
-import com.google.cloud.dataflow.sdk.util.state.StateTag;
-import com.google.cloud.dataflow.sdk.util.state.StateTags;
-import com.google.cloud.dataflow.sdk.util.state.ValueState;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-/**
- * An {@link ActiveWindowSet} for merging {@link WindowFn} implementations.
- *
- * <p>The underlying notion of {@link MergingActiveWindowSet} is that of representing equivalence
- * classes of merged windows as a mapping from the merged "super-window" to a set of
- * <i>state address</i> windows in which some state has been persisted. The mapping need not
- * contain EPHEMERAL windows, because they are created and merged without any persistent state.
- * Each window must be a state address window for at most one window, so the mapping is
- * invertible.
- *
- * <p>The states of a non-expired window are treated as follows:
- *
- * <ul>
- *   <li><b>NEW</b>: a NEW has an empty set of associated state address windows.</li>
- *   <li><b>ACTIVE</b>: an ACTIVE window will be associated with some nonempty set of state
- *       address windows. If the window has not merged, this will necessarily be the singleton set
- *       containing just itself, but it is not required that an ACTIVE window be amongst its
- *       state address windows.</li>
- *   <li><b>MERGED</b>: a MERGED window will be in the set of associated windows for some
- *       other window - that window is retrieved via {@link #representative} (this reverse
- *       association is implemented in O(1) time).</li>
- *   <li><b>EPHEMERAL</b>: EPHEMERAL windows are not persisted but are tracked transiently;
- *       an EPHEMERAL window must be registered with this {@link ActiveWindowSet} by a call
- *       to {@link #recordMerge} prior to any request for a {@link #representative}.</li>
- * </ul>
- *
- * <p>To illustrate why an ACTIVE window need not be amongst its own state address windows,
- * consider two active windows W1 and W2 that are merged to form W12. Further writes may be
- * applied to either of W1 or W2, since a read of W12 implies reading both of W12 and merging
- * their results. Hence W12 need not have state directly associated with it.
- */
-public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWindowSet<W> {
-  private final WindowFn<Object, W> windowFn;
-  private final Map<W, Set<W>> activeWindowToStateAddressWindows;
-
-  /**
-   * As above, but only for EPHEMERAL windows. Does not need to be persisted.
-   */
-  private final Map<W, Set<W>> activeWindowToEphemeralWindows;
-
-  /**
-   * A map from window to the ACTIVE window it has been merged into.
-   *
-   * <p>Does not need to be persisted.
-   *
-   * <ul>
-   * <li>Key window may be ACTIVE, MERGED or EPHEMERAL.
-   * <li>ACTIVE windows map to themselves.
-   * <li>If W1 maps to W2 then W2 is in {@link #activeWindowToStateAddressWindows}.
-   * <li>If W1 = W2 then W1 is ACTIVE. If W1 is in the state address window set for W2 then W1 is
-   * MERGED. Otherwise W1 is EPHEMERAL.
-   * </ul>
-   */
-  private final Map<W, W> windowToActiveWindow;
-
-  /**
-   * Deep clone of {@link #activeWindowToStateAddressWindows} as of last commit.
-   *
-   * <p>Used to avoid writing to state if no changes have been made during the work unit.
-   */
-  private final Map<W, Set<W>> originalActiveWindowToStateAddressWindows;
-
-  /**
-   * Handle representing our state in the backend.
-   */
-  private final ValueState<Map<W, Set<W>>> valueState;
-
-  public MergingActiveWindowSet(WindowFn<Object, W> windowFn, StateInternals<?> state) {
-    this.windowFn = windowFn;
-
-    StateTag<Object, ValueState<Map<W, Set<W>>>> mergeTreeAddr =
-        StateTags.makeSystemTagInternal(StateTags.value(
-            "tree", MapCoder.of(windowFn.windowCoder(), SetCoder.of(windowFn.windowCoder()))));
-    valueState = state.state(StateNamespaces.global(), mergeTreeAddr);
-    // Little use trying to prefetch this state since the ReduceFnRunner is stymied until it is
-    // available.
-    activeWindowToStateAddressWindows = emptyIfNull(valueState.read());
-    activeWindowToEphemeralWindows = new HashMap<>();
-    originalActiveWindowToStateAddressWindows = deepCopy(activeWindowToStateAddressWindows);
-    windowToActiveWindow = invert(activeWindowToStateAddressWindows);
-  }
-
-  @Override
-  public void removeEphemeralWindows() {
-    for (Map.Entry<W, Set<W>> entry : activeWindowToEphemeralWindows.entrySet()) {
-      for (W ephemeral : entry.getValue()) {
-        windowToActiveWindow.remove(ephemeral);
-      }
-    }
-    activeWindowToEphemeralWindows.clear();
-  }
-
-  @Override
-  public void persist() {
-    if (activeWindowToStateAddressWindows.isEmpty()) {
-      // Force all persistent state to disappear.
-      valueState.clear();
-      return;
-    }
-    if (activeWindowToStateAddressWindows.equals(originalActiveWindowToStateAddressWindows)) {
-      // No change.
-      return;
-    }
-    // All NEW windows must have been accounted for.
-    for (Map.Entry<W, Set<W>> entry : activeWindowToStateAddressWindows.entrySet()) {
-      Preconditions.checkState(
-          !entry.getValue().isEmpty(), "Cannot persist NEW window %s", entry.getKey());
-    }
-    // Should be no EPHEMERAL windows.
-    Preconditions.checkState(
-        activeWindowToEphemeralWindows.isEmpty(), "Unexpected EPHEMERAL windows before persist");
-
-    valueState.write(activeWindowToStateAddressWindows);
-    // No need to update originalActiveWindowToStateAddressWindows since this object is about to
-    // become garbage.
-  }
-
-  @Override
-  @Nullable
-  public W representative(W window) {
-    return windowToActiveWindow.get(window);
-  }
-
-  @Override
-  public Set<W> getActiveWindows() {
-    return activeWindowToStateAddressWindows.keySet();
-  }
-
-  @Override
-  public boolean isActive(W window) {
-    return activeWindowToStateAddressWindows.containsKey(window);
-  }
-
-  @Override
-  public void addNew(W window) {
-    if (!windowToActiveWindow.containsKey(window)) {
-      activeWindowToStateAddressWindows.put(window, new LinkedHashSet<W>());
-    }
-  }
-
-  @Override
-  public void addActive(W window) {
-    if (!windowToActiveWindow.containsKey(window)) {
-      Set<W> stateAddressWindows = new LinkedHashSet<>();
-      stateAddressWindows.add(window);
-      activeWindowToStateAddressWindows.put(window, stateAddressWindows);
-      windowToActiveWindow.put(window, window);
-    }
-  }
-
-  @Override
-  public void remove(W window) {
-    Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
-    if (stateAddressWindows == null) {
-      // Window is no longer active.
-      return;
-    }
-    for (W stateAddressWindow : stateAddressWindows) {
-      windowToActiveWindow.remove(stateAddressWindow);
-    }
-    activeWindowToStateAddressWindows.remove(window);
-    Set<W> ephemeralWindows = activeWindowToEphemeralWindows.get(window);
-    if (ephemeralWindows != null) {
-      for (W ephemeralWindow : ephemeralWindows) {
-        windowToActiveWindow.remove(ephemeralWindow);
-      }
-      activeWindowToEphemeralWindows.remove(window);
-    }
-    windowToActiveWindow.remove(window);
-  }
-
-  private class MergeContextImpl extends WindowFn<Object, W>.MergeContext {
-    private MergeCallback<W> mergeCallback;
-    private final List<Collection<W>> allToBeMerged;
-    private final List<Collection<W>> allActiveToBeMerged;
-    private final List<W> allMergeResults;
-    private final Set<W> seen;
-
-    public MergeContextImpl(MergeCallback<W> mergeCallback) {
-      windowFn.super();
-      this.mergeCallback = mergeCallback;
-      allToBeMerged = new ArrayList<>();
-      allActiveToBeMerged = new ArrayList<>();
-      allMergeResults = new ArrayList<>();
-      seen = new HashSet<>();
-    }
-
-    @Override
-    public Collection<W> windows() {
-      return activeWindowToStateAddressWindows.keySet();
-    }
-
-    @Override
-    public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
-      // The arguments have come from userland.
-      Preconditions.checkNotNull(toBeMerged);
-      Preconditions.checkNotNull(mergeResult);
-      List<W> copyOfToBeMerged = new ArrayList<>(toBeMerged.size());
-      List<W> activeToBeMerged = new ArrayList<>(toBeMerged.size());
-      boolean includesMergeResult = false;
-      for (W window : toBeMerged) {
-        Preconditions.checkNotNull(window);
-        Preconditions.checkState(
-            isActive(window), "Expecting merge window %s to be active", window);
-        if (window.equals(mergeResult)) {
-          includesMergeResult = true;
-        }
-        boolean notDup = seen.add(window);
-        Preconditions.checkState(
-            notDup, "Expecting merge window %s to appear in at most one merge set", window);
-        copyOfToBeMerged.add(window);
-        if (!activeWindowToStateAddressWindows.get(window).isEmpty()) {
-          activeToBeMerged.add(window);
-        }
-      }
-      if (!includesMergeResult) {
-        Preconditions.checkState(
-            !isActive(mergeResult), "Expecting result window %s to be new", mergeResult);
-      }
-      allToBeMerged.add(copyOfToBeMerged);
-      allActiveToBeMerged.add(activeToBeMerged);
-      allMergeResults.add(mergeResult);
-    }
-
-    public void recordMerges() throws Exception {
-      for (int i = 0; i < allToBeMerged.size(); i++) {
-        mergeCallback.prefetchOnMerge(
-            allToBeMerged.get(i), allActiveToBeMerged.get(i), allMergeResults.get(i));
-      }
-      for (int i = 0; i < allToBeMerged.size(); i++) {
-        mergeCallback.onMerge(
-            allToBeMerged.get(i), allActiveToBeMerged.get(i), allMergeResults.get(i));
-        recordMerge(allToBeMerged.get(i), allMergeResults.get(i));
-      }
-      allToBeMerged.clear();
-      allActiveToBeMerged.clear();
-      allMergeResults.clear();
-      seen.clear();
-    }
-  }
-
-  @Override
-  public void merge(MergeCallback<W> mergeCallback) throws Exception {
-    MergeContextImpl context = new MergeContextImpl(mergeCallback);
-
-    // See what the window function does with the NEW and already ACTIVE windows.
-    // Entering userland.
-    windowFn.mergeWindows(context);
-
-    // Actually do the merging and invoke the callbacks.
-    context.recordMerges();
-
-    // Any remaining NEW windows should become implicitly ACTIVE.
-    for (Map.Entry<W, Set<W>> entry : activeWindowToStateAddressWindows.entrySet()) {
-      if (entry.getValue().isEmpty()) {
-        // This window was NEW but since it survived merging must now become ACTIVE.
-        W window = entry.getKey();
-        entry.getValue().add(window);
-        windowToActiveWindow.put(window, window);
-      }
-    }
-  }
-
-  /**
-   * A {@link WindowFn#mergeWindows} call has determined that {@code toBeMerged} (which must
-   * all be ACTIVE}) should be considered equivalent to {@code activeWindow} (which is either a
-   * member of {@code toBeMerged} or is a new window). Make the corresponding change in
-   * the active window set.
-   */
-  private void recordMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {
-    Set<W> newStateAddressWindows = new LinkedHashSet<>();
-    Set<W> existingStateAddressWindows = activeWindowToStateAddressWindows.get(mergeResult);
-    if (existingStateAddressWindows != null) {
-      // Preserve all the existing state address windows for mergeResult.
-      newStateAddressWindows.addAll(existingStateAddressWindows);
-    }
-
-    Set<W> newEphemeralWindows = new HashSet<>();
-    Set<W> existingEphemeralWindows = activeWindowToEphemeralWindows.get(mergeResult);
-    if (existingEphemeralWindows != null) {
-      // Preserve all the existing EPHEMERAL windows for meregResult.
-      newEphemeralWindows.addAll(existingEphemeralWindows);
-    }
-
-    for (W other : toBeMerged) {
-      Set<W> otherStateAddressWindows = activeWindowToStateAddressWindows.get(other);
-      Preconditions.checkState(otherStateAddressWindows != null, "Window %s is not ACTIVE", other);
-
-      for (W otherStateAddressWindow : otherStateAddressWindows) {
-        // Since otherTarget equiv other AND other equiv mergeResult
-        // THEN otherTarget equiv mergeResult.
-        newStateAddressWindows.add(otherStateAddressWindow);
-        windowToActiveWindow.put(otherStateAddressWindow, mergeResult);
-      }
-      activeWindowToStateAddressWindows.remove(other);
-
-      Set<W> otherEphemeralWindows = activeWindowToEphemeralWindows.get(other);
-      if (otherEphemeralWindows != null) {
-        for (W otherEphemeral : otherEphemeralWindows) {
-          // Since otherEphemeral equiv other AND other equiv mergeResult
-          // THEN otherEphemeral equiv mergeResult.
-          newEphemeralWindows.add(otherEphemeral);
-          windowToActiveWindow.put(otherEphemeral, mergeResult);
-        }
-      }
-      activeWindowToEphemeralWindows.remove(other);
-
-      // Now other equiv mergeResult.
-      if (otherStateAddressWindows.contains(other)) {
-        // Other was ACTIVE and is now known to be MERGED.
-      } else if (otherStateAddressWindows.isEmpty()) {
-        // Other was NEW thus has no state. It is now EPHEMERAL.
-        newEphemeralWindows.add(other);
-      } else if (other.equals(mergeResult)) {
-        // Other was ACTIVE, was never used to store elements, but is still ACTIVE.
-        // Leave it as active.
-      } else {
-        // Other was ACTIVE, was never used to store element, as is no longer considered ACTIVE.
-        // It is now EPHEMERAL.
-        newEphemeralWindows.add(other);
-      }
-      windowToActiveWindow.put(other, mergeResult);
-    }
-
-    if (newStateAddressWindows.isEmpty()) {
-      // If stateAddressWindows is empty then toBeMerged must have only contained EPHEMERAL windows.
-      // Promote mergeResult to be active now.
-      newStateAddressWindows.add(mergeResult);
-    }
-    windowToActiveWindow.put(mergeResult, mergeResult);
-
-    activeWindowToStateAddressWindows.put(mergeResult, newStateAddressWindows);
-    if (!newEphemeralWindows.isEmpty()) {
-      activeWindowToEphemeralWindows.put(mergeResult, newEphemeralWindows);
-    }
-
-    merged(mergeResult);
-  }
-
-  @Override
-  public void merged(W window) {
-    Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
-    Preconditions.checkState(stateAddressWindows != null, "Window %s is not ACTIVE", window);
-    W first = Iterables.getFirst(stateAddressWindows, null);
-    stateAddressWindows.clear();
-    stateAddressWindows.add(first);
-  }
-
-  /**
-   * Return the state address windows for ACTIVE {@code window} from which all state associated
-   * should
-   * be read and merged.
-   */
-  @Override
-  public Set<W> readStateAddresses(W window) {
-    Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
-    Preconditions.checkState(stateAddressWindows != null, "Window %s is not ACTIVE", window);
-    return stateAddressWindows;
-  }
-
-  /**
-   * Return the state address window of ACTIVE {@code window} into which all new state should be
-   * written.
-   */
-  @Override
-  public W writeStateAddress(W window) {
-    Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
-    Preconditions.checkState(stateAddressWindows != null, "Window %s is not ACTIVE", window);
-    W result = Iterables.getFirst(stateAddressWindows, null);
-    Preconditions.checkState(result != null, "Window %s is still NEW", window);
-    return result;
-  }
-
-  @Override
-  public W mergedWriteStateAddress(Collection<W> toBeMerged, W mergeResult) {
-    Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(mergeResult);
-    if (stateAddressWindows != null && !stateAddressWindows.isEmpty()) {
-      return Iterables.getFirst(stateAddressWindows, null);
-    }
-    for (W mergedWindow : toBeMerged) {
-      stateAddressWindows = activeWindowToStateAddressWindows.get(mergedWindow);
-      if (stateAddressWindows != null && !stateAddressWindows.isEmpty()) {
-        return Iterables.getFirst(stateAddressWindows, null);
-      }
-    }
-    return mergeResult;
-  }
-
-  @VisibleForTesting
-  public void checkInvariants() {
-    Set<W> knownStateAddressWindows = new HashSet<>();
-    for (Map.Entry<W, Set<W>> entry : activeWindowToStateAddressWindows.entrySet()) {
-      W active = entry.getKey();
-      Preconditions.checkState(!entry.getValue().isEmpty(),
-          "Unexpected empty state address window set for ACTIVE window %s", active);
-      for (W stateAddressWindow : entry.getValue()) {
-        Preconditions.checkState(knownStateAddressWindows.add(stateAddressWindow),
-            "%s is in more than one state address window set", stateAddressWindow);
-        Preconditions.checkState(active.equals(windowToActiveWindow.get(stateAddressWindow)),
-            "%s should have %s as its ACTIVE window", stateAddressWindow, active);
-      }
-    }
-    for (Map.Entry<W, Set<W>> entry : activeWindowToEphemeralWindows.entrySet()) {
-      W active = entry.getKey();
-      Preconditions.checkState(activeWindowToStateAddressWindows.containsKey(active),
-          "%s must be ACTIVE window", active);
-      Preconditions.checkState(
-          !entry.getValue().isEmpty(), "Unexpected empty EPHEMERAL set for %s", active);
-      for (W ephemeralWindow : entry.getValue()) {
-        Preconditions.checkState(knownStateAddressWindows.add(ephemeralWindow),
-            "%s is EPHEMERAL/state address of more than one ACTIVE window", ephemeralWindow);
-        Preconditions.checkState(active.equals(windowToActiveWindow.get(ephemeralWindow)),
-            "%s should have %s as its ACTIVE window", ephemeralWindow, active);
-      }
-    }
-    for (Map.Entry<W, W> entry : windowToActiveWindow.entrySet()) {
-      Preconditions.checkState(activeWindowToStateAddressWindows.containsKey(entry.getValue()),
-          "%s should be ACTIVE since representative for %s", entry.getValue(), entry.getKey());
-    }
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("MergingActiveWindowSet {\n");
-    for (Map.Entry<W, Set<W>> entry : activeWindowToStateAddressWindows.entrySet()) {
-      W active = entry.getKey();
-      Set<W> stateAddressWindows = entry.getValue();
-      if (stateAddressWindows.isEmpty()) {
-        sb.append("  NEW ");
-        sb.append(active);
-        sb.append('\n');
-      } else {
-        sb.append("  ACTIVE ");
-        sb.append(active);
-        sb.append(":\n");
-        for (W stateAddressWindow : stateAddressWindows) {
-          if (stateAddressWindow.equals(active)) {
-            sb.append("    ACTIVE ");
-          } else {
-            sb.append("    MERGED ");
-          }
-          sb.append(stateAddressWindow);
-          sb.append("\n");
-          W active2 = windowToActiveWindow.get(stateAddressWindow);
-          Preconditions.checkState(active2.equals(active));
-        }
-        Set<W> ephemeralWindows = activeWindowToEphemeralWindows.get(active);
-        if (ephemeralWindows != null) {
-          for (W ephemeralWindow : ephemeralWindows) {
-            sb.append("    EPHEMERAL ");
-            sb.append(ephemeralWindow);
-            sb.append('\n');
-          }
-        }
-      }
-    }
-    sb.append("}");
-    return sb.toString();
-  }
-
-  // ======================================================================
-
-  /**
-   * Replace null {@code multimap} with empty map, and replace null entries in {@code multimap} with
-   * empty sets.
-   */
-  private static <W> Map<W, Set<W>> emptyIfNull(@Nullable Map<W, Set<W>> multimap) {
-    if (multimap == null) {
-      return new HashMap<>();
-    } else {
-      for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) {
-        if (entry.getValue() == null) {
-          entry.setValue(new LinkedHashSet<W>());
-        }
-      }
-      return multimap;
-    }
-  }
-
-  /** Return a deep copy of {@code multimap}. */
-  private static <W> Map<W, Set<W>> deepCopy(Map<W, Set<W>> multimap) {
-    Map<W, Set<W>> newMultimap = new HashMap<>();
-    for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) {
-      newMultimap.put(entry.getKey(), new LinkedHashSet<>(entry.getValue()));
-    }
-    return newMultimap;
-  }
-
-  /** Return inversion of {@code multimap}, which must be invertible. */
-  private static <W> Map<W, W> invert(Map<W, Set<W>> multimap) {
-    Map<W, W> result = new HashMap<>();
-    for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) {
-      W active = entry.getKey();
-      for (W target : entry.getValue()) {
-        W previous = result.put(target, active);
-        Preconditions.checkState(previous == null,
-            "Window %s has both %s and %s as representatives", target, previous, active);
-      }
-    }
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MimeTypes.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MimeTypes.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MimeTypes.java
deleted file mode 100644
index 489d183..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MimeTypes.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-/** Constants representing various mime types. */
-public class MimeTypes {
-  public static final String TEXT = "text/plain";
-  public static final String BINARY = "application/octet-stream";
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java
deleted file mode 100644
index d450187..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.cloud.dataflow.sdk.util.TimeUtil.fromCloudTime;
-
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Messages;
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.ListJobMessagesResponse;
-import com.google.cloud.dataflow.sdk.PipelineResult.State;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * A helper class for monitoring jobs submitted to the service.
- */
-public final class MonitoringUtil {
-
-  private static final String GCLOUD_DATAFLOW_PREFIX = "gcloud alpha dataflow";
-  private static final String ENDPOINT_OVERRIDE_ENV_VAR =
-      "CLOUDSDK_API_ENDPOINT_OVERRIDES_DATAFLOW";
-
-  private static final Map<String, State> DATAFLOW_STATE_TO_JOB_STATE =
-      ImmutableMap
-          .<String, State>builder()
-          .put("JOB_STATE_UNKNOWN", State.UNKNOWN)
-          .put("JOB_STATE_STOPPED", State.STOPPED)
-          .put("JOB_STATE_RUNNING", State.RUNNING)
-          .put("JOB_STATE_DONE", State.DONE)
-          .put("JOB_STATE_FAILED", State.FAILED)
-          .put("JOB_STATE_CANCELLED", State.CANCELLED)
-          .put("JOB_STATE_UPDATED", State.UPDATED)
-          .build();
-
-  private String projectId;
-  private Messages messagesClient;
-
-  /**
-   * An interface that can be used for defining callbacks to receive a list
-   * of JobMessages containing monitoring information.
-   */
-  public interface JobMessagesHandler {
-    /** Process the rows. */
-    void process(List<JobMessage> messages);
-  }
-
-  /** A handler that prints monitoring messages to a stream. */
-  public static class PrintHandler implements JobMessagesHandler {
-    private PrintStream out;
-
-    /**
-     * Construct the handler.
-     *
-     * @param stream The stream to write the messages to.
-     */
-    public PrintHandler(PrintStream stream) {
-      out = stream;
-    }
-
-    @Override
-    public void process(List<JobMessage> messages) {
-      for (JobMessage message : messages) {
-        if (message.getMessageText() == null || message.getMessageText().isEmpty()) {
-          continue;
-        }
-        String importanceString = null;
-        if (message.getMessageImportance() == null) {
-          continue;
-        } else if (message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
-          importanceString = "Error:   ";
-        } else if (message.getMessageImportance().equals("JOB_MESSAGE_WARNING")) {
-          importanceString = "Warning: ";
-        } else if (message.getMessageImportance().equals("JOB_MESSAGE_BASIC")) {
-          importanceString = "Basic:  ";
-        } else if (message.getMessageImportance().equals("JOB_MESSAGE_DETAILED")) {
-          importanceString = "Detail:  ";
-        } else {
-          // TODO: Remove filtering here once getJobMessages supports minimum
-          // importance.
-          continue;
-        }
-        @Nullable Instant time = TimeUtil.fromCloudTime(message.getTime());
-        if (time == null) {
-          out.print("UNKNOWN TIMESTAMP: ");
-        } else {
-          out.print(time + ": ");
-        }
-        if (importanceString != null) {
-          out.print(importanceString);
-        }
-        out.println(message.getMessageText());
-      }
-      out.flush();
-    }
-  }
-
-  /** Construct a helper for monitoring. */
-  public MonitoringUtil(String projectId, Dataflow dataflow) {
-    this(projectId, dataflow.projects().jobs().messages());
-  }
-
-  // @VisibleForTesting
-  MonitoringUtil(String projectId, Messages messagesClient) {
-    this.projectId = projectId;
-    this.messagesClient = messagesClient;
-  }
-
-  /**
-   * Comparator for sorting rows in increasing order based on timestamp.
-   */
-  public static class TimeStampComparator implements Comparator<JobMessage> {
-    @Override
-    public int compare(JobMessage o1, JobMessage o2) {
-      @Nullable Instant t1 = fromCloudTime(o1.getTime());
-      if (t1 == null) {
-        return -1;
-      }
-      @Nullable Instant t2 = fromCloudTime(o2.getTime());
-      if (t2 == null) {
-        return 1;
-      }
-      return t1.compareTo(t2);
-    }
-  }
-
-  /**
-   * Return job messages sorted in ascending order by timestamp.
-   * @param jobId The id of the job to get the messages for.
-   * @param startTimestampMs Return only those messages with a
-   *   timestamp greater than this value.
-   * @return collection of messages
-   * @throws IOException
-   */
-  public ArrayList<JobMessage> getJobMessages(
-      String jobId, long startTimestampMs) throws IOException {
-    // TODO: Allow filtering messages by importance
-    Instant startTimestamp = new Instant(startTimestampMs);
-    ArrayList<JobMessage> allMessages = new ArrayList<>();
-    String pageToken = null;
-    while (true) {
-      Messages.List listRequest = messagesClient.list(projectId, jobId);
-      if (pageToken != null) {
-        listRequest.setPageToken(pageToken);
-      }
-      ListJobMessagesResponse response = listRequest.execute();
-
-      if (response == null || response.getJobMessages() == null) {
-        return allMessages;
-      }
-
-      for (JobMessage m : response.getJobMessages()) {
-        @Nullable Instant timestamp = fromCloudTime(m.getTime());
-        if (timestamp == null) {
-          continue;
-        }
-        if (timestamp.isAfter(startTimestamp)) {
-          allMessages.add(m);
-        }
-      }
-
-      if (response.getNextPageToken() == null) {
-        break;
-      } else {
-        pageToken = response.getNextPageToken();
-      }
-    }
-
-    Collections.sort(allMessages, new TimeStampComparator());
-    return allMessages;
-  }
-
-  public static String getJobMonitoringPageURL(String projectName, String jobId) {
-    try {
-      // Project name is allowed in place of the project id: the user will be redirected to a URL
-      // that has the project name replaced with project id.
-      return String.format(
-          "https://console.developers.google.com/project/%s/dataflow/job/%s",
-          URLEncoder.encode(projectName, "UTF-8"),
-          URLEncoder.encode(jobId, "UTF-8"));
-    } catch (UnsupportedEncodingException e) {
-      // Should never happen.
-      throw new AssertionError("UTF-8 encoding is not supported by the environment", e);
-    }
-  }
-
-  public static String getGcloudCancelCommand(DataflowPipelineOptions options, String jobId) {
-
-    // If using a different Dataflow API than default, prefix command with an API override.
-    String dataflowApiOverridePrefix = "";
-    String apiUrl = options.getDataflowClient().getBaseUrl();
-    if (!apiUrl.equals(Dataflow.DEFAULT_BASE_URL)) {
-      dataflowApiOverridePrefix = String.format("%s=%s ", ENDPOINT_OVERRIDE_ENV_VAR, apiUrl);
-    }
-
-    // Assemble cancel command from optional prefix and project/job parameters.
-    return String.format("%s%s jobs --project=%s cancel %s",
-        dataflowApiOverridePrefix, GCLOUD_DATAFLOW_PREFIX, options.getProject(), jobId);
-  }
-
-  public static State toState(String stateName) {
-    return MoreObjects.firstNonNull(DATAFLOW_STATE_TO_JOB_STATE.get(stateName),
-        State.UNKNOWN);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MutationDetector.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MutationDetector.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MutationDetector.java
deleted file mode 100644
index 51e65ab..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MutationDetector.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-/**
- * An object for detecting illegal mutations.
- *
- * <p>The {@link AutoCloseable} aspect of this interface allows use in a try-with-resources
- * style, where the implementing class may choose to perform a final mutation check upon
- * {@link #close()}.
- */
-public interface MutationDetector extends AutoCloseable {
-  /**
-   * @throws IllegalMutationException if illegal mutations are detected.
-   */
-  void verifyUnmodified();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MutationDetectors.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MutationDetectors.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MutationDetectors.java
deleted file mode 100644
index 412e3eb..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MutationDetectors.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.common.base.Throwables;
-
-import java.util.Arrays;
-import java.util.Objects;
-
-/**
- * Static methods for creating and working with {@link MutationDetector}.
- */
-public class MutationDetectors {
-
-  private MutationDetectors() {}
-
-  /**
-     * Creates a new {@code MutationDetector} for the provided {@code value} that uses the provided
-     * {@link Coder} to perform deep copies and comparisons by serializing and deserializing values.
-     *
-     * <p>It is permissible for {@code value} to be {@code null}. Since {@code null} is immutable,
-     * the mutation check will always succeed.
-     */
-  public static <T> MutationDetector forValueWithCoder(T value, Coder<T> coder)
-      throws CoderException {
-    if (value == null) {
-      return noopMutationDetector();
-    } else {
-      return new CodedValueMutationDetector<>(value, coder);
-    }
-  }
-
-  /**
-   * Creates a new {@code MutationDetector} that always succeeds.
-   *
-   * <p>This is useful, for example, for providing a very efficient mutation detector for a value
-   * which is already immutable by design.
-   */
-  public static MutationDetector noopMutationDetector() {
-    return new NoopMutationDetector();
-  }
-
-  /**
-   * A {@link MutationDetector} for {@code null}, which is immutable.
-   */
-  private static class NoopMutationDetector implements MutationDetector {
-
-    @Override
-    public void verifyUnmodified() { }
-
-    @Override
-    public void close() { }
-  }
-
-  /**
-   * Given a value of type {@code T} and a {@link Coder} for that type, provides facilities to save
-   * check that the value has not changed.
-   *
-   * @param <T> the type of values checked for mutation
-   */
-  private static class CodedValueMutationDetector<T> implements MutationDetector {
-
-    private final Coder<T> coder;
-
-    /**
-     * A saved pointer to an in-memory value provided upon construction, which we will check for
-     * forbidden mutations.
-     */
-    private final T possiblyModifiedObject;
-
-    /**
-     * A saved encoded copy of the same value as {@link #possiblyModifiedObject}. Naturally, it
-     * will not change if {@link #possiblyModifiedObject} is mutated.
-     */
-    private final byte[] encodedOriginalObject;
-
-    /**
-     * The object decoded from {@link #encodedOriginalObject}. It will be used during every call to
-     * {@link #verifyUnmodified}, which could be called many times throughout the lifetime of this
-     * {@link CodedValueMutationDetector}.
-     */
-    private final T clonedOriginalObject;
-
-    /**
-     * Create a mutation detector for the provided {@code value}, using the provided {@link Coder}
-     * for cloning and checking serialized forms for equality.
-     */
-    public CodedValueMutationDetector(T value, Coder<T> coder) throws CoderException {
-      this.coder = coder;
-      this.possiblyModifiedObject = value;
-      this.encodedOriginalObject = CoderUtils.encodeToByteArray(coder, value);
-      this.clonedOriginalObject = CoderUtils.decodeFromByteArray(coder, encodedOriginalObject);
-    }
-
-    @Override
-    public void verifyUnmodified() {
-      try {
-        verifyUnmodifiedThrowingCheckedExceptions();
-      } catch (CoderException exn) {
-        Throwables.propagate(exn);
-      }
-    }
-
-    private void verifyUnmodifiedThrowingCheckedExceptions() throws CoderException {
-      // If either object believes they are equal, we trust that and short-circuit deeper checks.
-      if (Objects.equals(possiblyModifiedObject, clonedOriginalObject)
-          || Objects.equals(clonedOriginalObject, possiblyModifiedObject)) {
-        return;
-      }
-
-      // Since retainedObject is in general an instance of a subclass of T, when it is cloned to
-      // clonedObject using a Coder<T>, the two will generally be equivalent viewed as a T, but in
-      // general neither retainedObject.equals(clonedObject) nor clonedObject.equals(retainedObject)
-      // will hold.
-      //
-      // For example, CoderUtils.clone(IterableCoder<Integer>, IterableSubclass<Integer>) will
-      // produce an ArrayList<Integer> with the same contents as the IterableSubclass, but the
-      // latter will quite reasonably not consider itself equivalent to an ArrayList (and vice
-      // versa).
-      //
-      // To enable a reasonable comparison, we clone retainedObject again here, converting it to
-      // the same sort of T that the Coder<T> output when it created clonedObject.
-      T clonedPossiblyModifiedObject = CoderUtils.clone(coder, possiblyModifiedObject);
-
-      // If deepEquals() then we trust the equals implementation.
-      // This deliberately allows fields to escape this check.
-      if (Objects.deepEquals(clonedPossiblyModifiedObject, clonedOriginalObject)) {
-        return;
-      }
-
-      // If not deepEquals(), the class may just have a poor equals() implementation.
-      // So we next try checking their serialized forms. We re-serialize instead of checking
-      // encodedObject, because the Coder may treat it differently.
-      //
-      // For example, an unbounded Iterable will be encoded in an unbounded way, but decoded into an
-      // ArrayList, which will then be re-encoded in a bounded format. So we really do need to
-      // encode-decode-encode retainedObject.
-      if (Arrays.equals(
-          CoderUtils.encodeToByteArray(coder, clonedOriginalObject),
-          CoderUtils.encodeToByteArray(coder, clonedPossiblyModifiedObject))) {
-        return;
-      }
-
-      // If we got here, then they are not deepEquals() and do not have deepEquals() encodings.
-      // Even if there is some conceptual sense in which the objects are equivalent, it has not
-      // been adequately expressed in code.
-      illegalMutation(clonedOriginalObject, clonedPossiblyModifiedObject);
-    }
-
-    private void illegalMutation(T previousValue, T newValue) throws CoderException {
-      throw new IllegalMutationException(
-          String.format("Value %s mutated illegally, new value was %s."
-              + " Encoding was %s, now %s.",
-              previousValue, newValue,
-              CoderUtils.encodeToBase64(coder, previousValue),
-              CoderUtils.encodeToBase64(coder, newValue)),
-          previousValue, newValue);
-    }
-
-    @Override
-    public void close() {
-      verifyUnmodified();
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NonEmptyPanes.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NonEmptyPanes.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NonEmptyPanes.java
deleted file mode 100644
index 1270f01..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NonEmptyPanes.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode;
-import com.google.cloud.dataflow.sdk.util.state.AccumulatorCombiningState;
-import com.google.cloud.dataflow.sdk.util.state.MergingStateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.ReadableState;
-import com.google.cloud.dataflow.sdk.util.state.StateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.StateMerging;
-import com.google.cloud.dataflow.sdk.util.state.StateTag;
-import com.google.cloud.dataflow.sdk.util.state.StateTags;
-
-/**
- * Tracks which windows have non-empty panes. Specifically, which windows have new elements since
- * their last triggering.
- *
- * @param <W> The kind of windows being tracked.
- */
-public abstract class NonEmptyPanes<K, W extends BoundedWindow> {
-
-  static <K, W extends BoundedWindow> NonEmptyPanes<K, W> create(
-      WindowingStrategy<?, W> strategy, ReduceFn<K, ?, ?, W> reduceFn) {
-    if (strategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES) {
-      return new DiscardingModeNonEmptyPanes<>(reduceFn);
-    } else {
-      return new GeneralNonEmptyPanes<>();
-    }
-  }
-
-  /**
-   * Record that some content has been added to the window in {@code context}, and therefore the
-   * current pane is not empty.
-   */
-  public abstract void recordContent(StateAccessor<K> context);
-
-  /**
-   * Record that the given pane is empty.
-   */
-  public abstract void clearPane(StateAccessor<K> state);
-
-  /**
-   * Return true if the current pane for the window in {@code context} is empty.
-   */
-  public abstract ReadableState<Boolean> isEmpty(StateAccessor<K> context);
-
-  /**
-   * Prefetch in preparation for merging.
-   */
-  public abstract void prefetchOnMerge(MergingStateAccessor<K, W> state);
-
-  /**
-   * Eagerly merge backing state.
-   */
-  public abstract void onMerge(MergingStateAccessor<K, W> context);
-
-  /**
-   * An implementation of {@code NonEmptyPanes} optimized for use with discarding mode. Uses the
-   * presence of data in the accumulation buffer to record non-empty panes.
-   */
-  private static class DiscardingModeNonEmptyPanes<K, W extends BoundedWindow>
-      extends NonEmptyPanes<K, W> {
-
-    private ReduceFn<K, ?, ?, W> reduceFn;
-
-    private DiscardingModeNonEmptyPanes(ReduceFn<K, ?, ?, W> reduceFn) {
-      this.reduceFn = reduceFn;
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
-      return reduceFn.isEmpty(state);
-    }
-
-    @Override
-    public void recordContent(StateAccessor<K> state) {
-      // Nothing to do -- the reduceFn is tracking contents
-    }
-
-    @Override
-    public void clearPane(StateAccessor<K> state) {
-      // Nothing to do -- the reduceFn is tracking contents
-    }
-
-    @Override
-    public void prefetchOnMerge(MergingStateAccessor<K, W> state) {
-      // Nothing to do -- the reduceFn is tracking contents
-    }
-
-    @Override
-    public void onMerge(MergingStateAccessor<K, W> context) {
-      // Nothing to do -- the reduceFn is tracking contents
-    }
-  }
-
-  /**
-   * An implementation of {@code NonEmptyPanes} for general use.
-   */
-  private static class GeneralNonEmptyPanes<K, W extends BoundedWindow>
-      extends NonEmptyPanes<K, W> {
-
-    private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
-        PANE_ADDITIONS_TAG =
-        StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
-            "count", VarLongCoder.of(), new Sum.SumLongFn()));
-
-    @Override
-    public void recordContent(StateAccessor<K> state) {
-      state.access(PANE_ADDITIONS_TAG).add(1L);
-    }
-
-    @Override
-    public void clearPane(StateAccessor<K> state) {
-      state.access(PANE_ADDITIONS_TAG).clear();
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
-      return state.access(PANE_ADDITIONS_TAG).isEmpty();
-    }
-
-    @Override
-    public void prefetchOnMerge(MergingStateAccessor<K, W> state) {
-      StateMerging.prefetchCombiningValues(state, PANE_ADDITIONS_TAG);
-    }
-
-    @Override
-    public void onMerge(MergingStateAccessor<K, W> context) {
-      StateMerging.mergeCombiningValues(context, PANE_ADDITIONS_TAG);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NonMergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NonMergingActiveWindowSet.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NonMergingActiveWindowSet.java
deleted file mode 100644
index cb7f9b0..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NonMergingActiveWindowSet.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.common.collect.ImmutableSet;
-
-import java.util.Collection;
-import java.util.Set;
-
-/**
- * Implementation of {@link ActiveWindowSet} used with {@link WindowFn WindowFns} that don't support
- * merging.
- *
- * @param <W> the types of windows being managed
- */
-public class NonMergingActiveWindowSet<W extends BoundedWindow> implements ActiveWindowSet<W> {
-  @Override
-  public void removeEphemeralWindows() {}
-
-  @Override
-  public void persist() {}
-
-  @Override
-  public W representative(W window) {
-    // Always represented by itself.
-    return window;
-  }
-
-  @Override
-  public Set<W> getActiveWindows() {
-    // Only supported when merging.
-    throw new java.lang.UnsupportedOperationException();
-  }
-
-  @Override
-  public boolean isActive(W window) {
-    // Windows should never disappear, since we don't support merging.
-    return true;
-  }
-
-  @Override
-  public void addNew(W window) {}
-
-  @Override
-  public void addActive(W window) {}
-
-  @Override
-  public void remove(W window) {}
-
-  @Override
-  public void merge(MergeCallback<W> mergeCallback) throws Exception {}
-
-  @Override
-  public void merged(W window) {}
-
-  @Override
-  public Set<W> readStateAddresses(W window) {
-    return ImmutableSet.of(window);
-  }
-
-  @Override
-  public W writeStateAddress(W window) {
-    return window;
-  }
-
-  @Override
-  public W mergedWriteStateAddress(Collection<W> toBeMerged, W mergeResult) {
-    return mergeResult;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NoopCredentialFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NoopCredentialFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NoopCredentialFactory.java
deleted file mode 100644
index 9ef4c2e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NoopCredentialFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-
-import java.io.IOException;
-import java.security.GeneralSecurityException;
-
-/**
- * Construct an oauth credential to be used by the SDK and the SDK workers.
- * Always returns a null Credential object.
- */
-public class NoopCredentialFactory implements CredentialFactory {
-  public static NoopCredentialFactory fromOptions(PipelineOptions options) {
-    return new NoopCredentialFactory();
-  }
-
-  @Override
-  public Credential getCredential() throws IOException, GeneralSecurityException {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NoopPathValidator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NoopPathValidator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NoopPathValidator.java
deleted file mode 100644
index 00abbb1..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NoopPathValidator.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-
-/**
- * Noop implementation of {@link PathValidator}. All paths are allowed and returned unchanged.
- */
-public class NoopPathValidator implements PathValidator {
-
-  private NoopPathValidator() {
-  }
-
-  public static PathValidator fromOptions(
-      @SuppressWarnings("unused") PipelineOptions options) {
-    return new NoopPathValidator();
-  }
-
-  @Override
-  public String validateInputFilePatternSupported(String filepattern) {
-    return filepattern;
-  }
-
-  @Override
-  public String validateOutputFilePrefixSupported(String filePrefix) {
-    return filePrefix;
-  }
-
-  @Override
-  public String verifyPath(String path) {
-    return path;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NullSideInputReader.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NullSideInputReader.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NullSideInputReader.java
deleted file mode 100644
index 0fc2646..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NullSideInputReader.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.common.collect.Sets;
-
-import java.util.Collections;
-import java.util.Set;
-
-/**
- * A {@link SideInputReader} representing a well-defined set of views, but not storing
- * any values for them. Used to check if a side input is present when the data itself
- * comes from elsewhere.
- */
-public class NullSideInputReader implements SideInputReader {
-
-  private Set<PCollectionView<?>> views;
-
-  public static NullSideInputReader empty() {
-    return new NullSideInputReader(Collections.<PCollectionView<?>>emptySet());
-  }
-
-  public static NullSideInputReader of(Iterable<? extends PCollectionView<?>> views) {
-    return new NullSideInputReader(views);
-  }
-
-  private NullSideInputReader(Iterable<? extends PCollectionView<?>> views) {
-    this.views = Sets.newHashSet(views);
-  }
-
-  @Override
-  public <T> T get(PCollectionView<T> view, BoundedWindow window) {
-    throw new IllegalArgumentException("cannot call NullSideInputReader.get()");
-  }
-
-  @Override
-  public boolean isEmpty() {
-    return views.isEmpty();
-  }
-
-  @Override
-  public <T> boolean contains(PCollectionView<T> view) {
-    return views.contains(view);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/OutputReference.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/OutputReference.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/OutputReference.java
deleted file mode 100644
index 096c996..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/OutputReference.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.api.client.util.Preconditions.checkNotNull;
-
-import com.google.api.client.json.GenericJson;
-import com.google.api.client.util.Key;
-
-/**
- * A representation used by {@link com.google.api.services.dataflow.model.Step}s
- * to reference the output of other {@code Step}s.
- */
-public final class OutputReference extends GenericJson {
-  @Key("@type")
-  public final String type = "OutputReference";
-
-  @Key("step_name")
-  private final String stepName;
-
-  @Key("output_name")
-  private final String outputName;
-
-  public OutputReference(String stepName, String outputName) {
-    this.stepName = checkNotNull(stepName);
-    this.outputName = checkNotNull(outputName);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PCollectionViewWindow.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PCollectionViewWindow.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PCollectionViewWindow.java
deleted file mode 100644
index 7cf636e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PCollectionViewWindow.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-
-import java.util.Objects;
-
-/**
- * A pair of a {@link PCollectionView} and a {@link BoundedWindow}, which can
- * be thought of as window "of" the view. This is a value class for use e.g.
- * as a compound cache key.
- *
- * @param <T> the type of the underlying PCollectionView
- */
-public final class PCollectionViewWindow<T> {
-
-  private final PCollectionView<T> view;
-  private final BoundedWindow window;
-
-  private PCollectionViewWindow(PCollectionView<T> view, BoundedWindow window) {
-    this.view = view;
-    this.window = window;
-  }
-
-  public static <T> PCollectionViewWindow<T> of(PCollectionView<T> view, BoundedWindow window) {
-    return new PCollectionViewWindow<>(view, window);
-  }
-
-  public PCollectionView<T> getView() {
-    return view;
-  }
-
-  public BoundedWindow getWindow() {
-    return window;
-  }
-
-  @Override
-  public boolean equals(Object otherObject) {
-    if (!(otherObject instanceof PCollectionViewWindow)) {
-      return false;
-    }
-    @SuppressWarnings("unchecked")
-    PCollectionViewWindow<T> other = (PCollectionViewWindow<T>) otherObject;
-    return getView().equals(other.getView()) && getWindow().equals(other.getWindow());
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(getView(), getWindow());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PCollectionViews.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PCollectionViews.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PCollectionViews.java
deleted file mode 100644
index 7e73547..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PCollectionViews.java
+++ /dev/null
@@ -1,426 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.IterableCoder;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PValueBase;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.base.Function;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Objects;
-
-import javax.annotation.Nullable;
-
-/**
- * Implementations of {@link PCollectionView} shared across the SDK.
- *
- * <p>For internal use only, subject to change.
- */
-public class PCollectionViews {
-
-  /**
-   * Returns a {@code PCollectionView<T>} capable of processing elements encoded using the provided
-   * {@link Coder} and windowed using the provided * {@link WindowingStrategy}.
-   *
-   * <p>If {@code hasDefault} is {@code true}, then the view will take on the value
-   * {@code defaultValue} for any empty windows.
-   */
-  public static <T, W extends BoundedWindow> PCollectionView<T> singletonView(
-      Pipeline pipeline,
-      WindowingStrategy<?, W> windowingStrategy,
-      boolean hasDefault,
-      T defaultValue,
-      Coder<T> valueCoder) {
-    return new SingletonPCollectionView<>(
-        pipeline, windowingStrategy, hasDefault, defaultValue, valueCoder);
-  }
-
-  /**
-   * Returns a {@code PCollectionView<Iterable<T>>} capable of processing elements encoded using the
-   * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
-   */
-  public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterableView(
-      Pipeline pipeline,
-      WindowingStrategy<?, W> windowingStrategy,
-      Coder<T> valueCoder) {
-    return new IterablePCollectionView<>(pipeline, windowingStrategy, valueCoder);
-  }
-
-  /**
-   * Returns a {@code PCollectionView<List<T>>} capable of processing elements encoded using the
-   * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
-   */
-  public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
-      Pipeline pipeline,
-      WindowingStrategy<?, W> windowingStrategy,
-      Coder<T> valueCoder) {
-    return new ListPCollectionView<>(pipeline, windowingStrategy, valueCoder);
-  }
-
-  /**
-   * Returns a {@code PCollectionView<Map<K, V>>} capable of processing elements encoded using the
-   * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
-   */
-  public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, V>> mapView(
-      Pipeline pipeline,
-      WindowingStrategy<?, W> windowingStrategy,
-      Coder<KV<K, V>> valueCoder) {
-
-    return new MapPCollectionView<K, V, W>(pipeline, windowingStrategy, valueCoder);
-  }
-
-  /**
-   * Returns a {@code PCollectionView<Map<K, Iterable<V>>>} capable of processing elements encoded
-   * using the provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
-   */
-  public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>> multimapView(
-      Pipeline pipeline,
-      WindowingStrategy<?, W> windowingStrategy,
-      Coder<KV<K, V>> valueCoder) {
-    return new MultimapPCollectionView<K, V, W>(pipeline, windowingStrategy, valueCoder);
-  }
-
-  /**
-   * Implementation of conversion of singleton {@code Iterable<WindowedValue<T>>} to {@code T}.
-   *
-   * <p>For internal use only.
-   *
-   * <p>Instantiate via {@link PCollectionViews#singletonView}.
-   */
-  public static class SingletonPCollectionView<T, W extends BoundedWindow>
-     extends PCollectionViewBase<T, T, W> {
-    @Nullable private byte[] encodedDefaultValue;
-    @Nullable private transient T defaultValue;
-    @Nullable private Coder<T> valueCoder;
-    private boolean hasDefault;
-
-    private SingletonPCollectionView(
-        Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy,
-        boolean hasDefault, T defaultValue, Coder<T> valueCoder) {
-      super(pipeline, windowingStrategy, valueCoder);
-      this.hasDefault = hasDefault;
-      this.defaultValue = defaultValue;
-      this.valueCoder = valueCoder;
-      if (hasDefault) {
-        try {
-          this.encodedDefaultValue = CoderUtils.encodeToByteArray(valueCoder, defaultValue);
-        } catch (IOException e) {
-          throw new RuntimeException("Unexpected IOException: ", e);
-        }
-      }
-    }
-
-    /**
-     * Returns the default value that was specified.
-     *
-     * <p>For internal use only.
-     *
-     * @throws NoSuchElementException if no default was specified.
-     */
-    public T getDefaultValue() {
-      if (!hasDefault) {
-        throw new NoSuchElementException("Empty PCollection accessed as a singleton view.");
-      }
-      // Lazily decode the default value once
-      synchronized (this) {
-        if (encodedDefaultValue != null) {
-          try {
-            defaultValue = CoderUtils.decodeFromByteArray(valueCoder, encodedDefaultValue);
-            encodedDefaultValue = null;
-          } catch (IOException e) {
-            throw new RuntimeException("Unexpected IOException: ", e);
-          }
-        }
-      }
-      return defaultValue;
-    }
-
-    @Override
-    protected T fromElements(Iterable<WindowedValue<T>> contents) {
-      try {
-        return Iterables.getOnlyElement(contents).getValue();
-      } catch (NoSuchElementException exc) {
-        return getDefaultValue();
-      } catch (IllegalArgumentException exc) {
-        throw new IllegalArgumentException(
-            "PCollection with more than one element "
-            + "accessed as a singleton view.");
-      }
-    }
-  }
-
-  /**
-   * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code Iterable<T>}.
-   *
-   * <p>For internal use only.
-   *
-   * <p>Instantiate via {@link PCollectionViews#iterableView}.
-   */
-  public static class IterablePCollectionView<T, W extends BoundedWindow>
-      extends PCollectionViewBase<T, Iterable<T>, W> {
-    private IterablePCollectionView(
-        Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, Coder<T> valueCoder) {
-      super(pipeline, windowingStrategy, valueCoder);
-    }
-
-    @Override
-    protected Iterable<T> fromElements(Iterable<WindowedValue<T>> contents) {
-      return Iterables.unmodifiableIterable(
-          Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
-        @SuppressWarnings("unchecked")
-        @Override
-        public T apply(WindowedValue<T> input) {
-          return input.getValue();
-        }
-      }));
-    }
-  }
-
-  /**
-   * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code List<T>}.
-   *
-   * <p>For internal use only.
-   *
-   * <p>Instantiate via {@link PCollectionViews#listView}.
-   */
-  public static class ListPCollectionView<T, W extends BoundedWindow>
-      extends PCollectionViewBase<T, List<T>, W> {
-    private ListPCollectionView(
-        Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, Coder<T> valueCoder) {
-      super(pipeline, windowingStrategy, valueCoder);
-    }
-
-    @Override
-    protected List<T> fromElements(Iterable<WindowedValue<T>> contents) {
-      return ImmutableList.copyOf(
-          Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
-            @SuppressWarnings("unchecked")
-            @Override
-            public T apply(WindowedValue<T> input) {
-              return input.getValue();
-            }
-          }));
-    }
-  }
-
-  /**
-   * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>>}
-   * to {@code Map<K, Iterable<V>>}.
-   *
-   * <p>For internal use only.
-   */
-  public static class MultimapPCollectionView<K, V, W extends BoundedWindow>
-      extends PCollectionViewBase<KV<K, V>, Map<K, Iterable<V>>, W> {
-    private MultimapPCollectionView(
-        Pipeline pipeline,
-        WindowingStrategy<?, W> windowingStrategy,
-        Coder<KV<K, V>> valueCoder) {
-      super(pipeline, windowingStrategy, valueCoder);
-    }
-
-    @Override
-    protected Map<K, Iterable<V>> fromElements(Iterable<WindowedValue<KV<K, V>>> elements) {
-      Multimap<K, V> multimap = HashMultimap.create();
-      for (WindowedValue<KV<K, V>> elem : elements) {
-        KV<K, V> kv = elem.getValue();
-        multimap.put(kv.getKey(), kv.getValue());
-      }
-      // Safe covariant cast that Java cannot express without rawtypes, even with unchecked casts
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      Map<K, Iterable<V>> resultMap = (Map) multimap.asMap();
-      return Collections.unmodifiableMap(resultMap);
-    }
-  }
-
-  /**
-   * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>} with
-   * one value per key to {@code Map<K, V>}.
-   *
-   * <p>For internal use only.
-   */
-  public static class MapPCollectionView<K, V, W extends BoundedWindow>
-      extends PCollectionViewBase<KV<K, V>, Map<K, V>, W> {
-    private MapPCollectionView(
-        Pipeline pipeline,
-        WindowingStrategy<?, W> windowingStrategy,
-        Coder<KV<K, V>> valueCoder) {
-      super(pipeline, windowingStrategy, valueCoder);
-    }
-
-    /**
-     * Input iterable must actually be {@code Iterable<WindowedValue<KV<K, V>>>}.
-     */
-    @Override
-    protected Map<K, V> fromElements(Iterable<WindowedValue<KV<K, V>>> elements) {
-      Map<K, V> map = new HashMap<>();
-      for (WindowedValue<KV<K, V>> elem : elements) {
-        KV<K, V> kv = elem.getValue();
-        if (map.containsKey(kv.getKey())) {
-          throw new IllegalArgumentException("Duplicate values for " + kv.getKey());
-        }
-        map.put(kv.getKey(), kv.getValue());
-      }
-      return Collections.unmodifiableMap(map);
-    }
-  }
-
-  /**
-   * A base class for {@link PCollectionView} implementations, with additional type parameters
-   * that are not visible at pipeline assembly time when the view is used as a side input.
-   */
-  private abstract static class PCollectionViewBase<ElemT, ViewT, W extends BoundedWindow>
-      extends PValueBase
-      implements PCollectionView<ViewT> {
-    /** A unique tag for the view, typed according to the elements underlying the view. */
-    private TupleTag<Iterable<WindowedValue<ElemT>>> tag;
-
-    /** The windowing strategy for the PCollection underlying the view. */
-    private WindowingStrategy<?, W> windowingStrategy;
-
-    /** The coder for the elements underlying the view. */
-    private Coder<Iterable<WindowedValue<ElemT>>> coder;
-
-    /**
-     * Implement this to complete the implementation. It is a conversion function from
-     * all of the elements of the underlying {@link PCollection} to the value of the view.
-     */
-    protected abstract ViewT fromElements(Iterable<WindowedValue<ElemT>> elements);
-
-    /**
-     * Call this constructor to initialize the fields for which this base class provides
-     * boilerplate accessors.
-     */
-    protected PCollectionViewBase(
-        Pipeline pipeline,
-        TupleTag<Iterable<WindowedValue<ElemT>>> tag,
-        WindowingStrategy<?, W> windowingStrategy,
-        Coder<ElemT> valueCoder) {
-      super(pipeline);
-      if (windowingStrategy.getWindowFn() instanceof InvalidWindows) {
-        throw new IllegalArgumentException("WindowFn of PCollectionView cannot be InvalidWindows");
-      }
-      this.tag = tag;
-      this.windowingStrategy = windowingStrategy;
-      this.coder =
-          IterableCoder.of(WindowedValue.getFullCoder(
-              valueCoder, windowingStrategy.getWindowFn().windowCoder()));
-    }
-
-    /**
-     * Call this constructor to initialize the fields for which this base class provides
-     * boilerplate accessors, with an auto-generated tag.
-     */
-    protected PCollectionViewBase(
-        Pipeline pipeline,
-        WindowingStrategy<?, W> windowingStrategy,
-        Coder<ElemT> valueCoder) {
-      this(pipeline, new TupleTag<Iterable<WindowedValue<ElemT>>>(), windowingStrategy, valueCoder);
-    }
-
-    /**
-     * For serialization only. Do not use directly. Subclasses should call from their own
-     * protected no-argument constructor.
-     */
-    @SuppressWarnings("unused")  // used for serialization
-    protected PCollectionViewBase() {
-      super();
-    }
-
-    @Override
-    public ViewT fromIterableInternal(Iterable<WindowedValue<?>> elements) {
-      // Safe cast: it is required that the rest of the SDK maintain the invariant
-      // that a PCollectionView is only provided an iterable for the elements of an
-      // appropriately typed PCollection.
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      Iterable<WindowedValue<ElemT>> typedElements = (Iterable) elements;
-      return fromElements(typedElements);
-    }
-
-    /**
-     * Returns a unique {@link TupleTag} identifying this {@link PCollectionView}.
-     *
-     * <p>For internal use only by runner implementors.
-     */
-    @Override
-    public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() {
-      // Safe cast: It is required that the rest of the SDK maintain the invariant that
-      // this tag is only used to access the contents of an appropriately typed underlying
-      // PCollection
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      TupleTag<Iterable<WindowedValue<?>>> untypedTag = (TupleTag) tag;
-      return untypedTag;
-    }
-
-    /**
-     * Returns the {@link WindowingStrategy} of this {@link PCollectionView}, which should
-     * be that of the underlying {@link PCollection}.
-     *
-     * <p>For internal use only by runner implementors.
-     */
-    @Override
-    public WindowingStrategy<?, ?> getWindowingStrategyInternal() {
-      return windowingStrategy;
-    }
-
-    @Override
-    public Coder<Iterable<WindowedValue<?>>> getCoderInternal() {
-      // Safe cast: It is required that the rest of the SDK only use this untyped coder
-      // for the elements of an appropriately typed underlying PCollection.
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      Coder<Iterable<WindowedValue<?>>> untypedCoder = (Coder) coder;
-      return untypedCoder;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(tag);
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (!(other instanceof PCollectionView) || other == null) {
-        return false;
-      }
-      @SuppressWarnings("unchecked")
-      PCollectionView<?> otherView = (PCollectionView<?>) other;
-      return tag.equals(otherView.getTagInternal());
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this).add("tag", tag).toString();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PTuple.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PTuple.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PTuple.java
deleted file mode 100644
index 5b87b5c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PTuple.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-/**
- * A {@code PTuple} is an immutable tuple of
- * heterogeneously-typed values, "keyed" by {@link TupleTag}s.
- *
- * <p>PTuples can be created and accessed like follows:
- * <pre> {@code
- * String v1 = ...;
- * Integer v2 = ...;
- * Iterable<String> v3 = ...;
- *
- * // Create TupleTags for each of the values to put in the
- * // PTuple (the type of the TupleTag enables tracking the
- * // static type of each of the values in the PTuple):
- * TupleTag<String> tag1 = new TupleTag<>();
- * TupleTag<Integer> tag2 = new TupleTag<>();
- * TupleTag<Iterable<String>> tag3 = new TupleTag<>();
- *
- * // Create a PTuple with three values:
- * PTuple povs =
- *     PTuple.of(tag1, v1)
- *         .and(tag2, v2)
- *         .and(tag3, v3);
- *
- * // Create an empty PTuple:
- * Pipeline p = ...;
- * PTuple povs2 = PTuple.empty(p);
- *
- * // Get values out of a PTuple, using the same tags
- * // that were used to put them in:
- * Integer vX = povs.get(tag2);
- * String vY = povs.get(tag1);
- * Iterable<String> vZ = povs.get(tag3);
- *
- * // Get a map of all values in a PTuple:
- * Map<TupleTag<?>, ?> allVs = povs.getAll();
- * } </pre>
- */
-public class PTuple {
-  /**
-   * Returns an empty PTuple.
-   *
-   * <p>Longer PTuples can be created by calling
-   * {@link #and} on the result.
-   */
-  public static PTuple empty() {
-    return new PTuple();
-  }
-
-  /**
-   * Returns a singleton PTuple containing the given
-   * value keyed by the given TupleTag.
-   *
-   * <p>Longer PTuples can be created by calling
-   * {@link #and} on the result.
-   */
-  public static <V> PTuple of(TupleTag<V> tag, V value) {
-    return empty().and(tag, value);
-  }
-
-  /**
-   * Returns a new PTuple that has all the values and
-   * tags of this PTuple plus the given value and tag.
-   *
-   * <p>The given TupleTag should not already be mapped to a
-   * value in this PTuple.
-   */
-  public <V> PTuple and(TupleTag<V> tag, V value) {
-    Map<TupleTag<?>, Object> newMap = new LinkedHashMap<TupleTag<?>, Object>();
-    newMap.putAll(valueMap);
-    newMap.put(tag, value);
-    return new PTuple(newMap);
-  }
-
-  /**
-   * Returns whether this PTuple contains a value with
-   * the given tag.
-   */
-  public <V> boolean has(TupleTag<V> tag) {
-    return valueMap.containsKey(tag);
-  }
-
-  /**
-   * Returns true if this {@code PTuple} is empty.
-   */
-  public boolean isEmpty() {
-    return valueMap.isEmpty();
-  }
-
-  /**
-   * Returns the value with the given tag in this
-   * PTuple.  Throws IllegalArgumentException if there is no
-   * such value, i.e., {@code !has(tag)}.
-   */
-  public <V> V get(TupleTag<V> tag) {
-    if (!has(tag)) {
-      throw new IllegalArgumentException(
-          "TupleTag not found in this PTuple");
-    }
-    @SuppressWarnings("unchecked")
-    V value = (V) valueMap.get(tag);
-    return value;
-  }
-
-  /**
-   * Returns an immutable Map from TupleTag to corresponding
-   * value, for all the members of this PTuple.
-   */
-  public Map<TupleTag<?>, ?> getAll() {
-    return valueMap;
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-  // Internal details below here.
-
-  private final Map<TupleTag<?>, ?> valueMap;
-
-  @SuppressWarnings("rawtypes")
-  private PTuple() {
-    this(new LinkedHashMap());
-  }
-
-  private PTuple(Map<TupleTag<?>, ?> valueMap) {
-    this.valueMap = Collections.unmodifiableMap(valueMap);
-  }
-
-  /**
-   * Returns a PTuple with each of the given tags mapping
-   * to the corresponding value.
-   *
-   * <p>For internal use only.
-   */
-  public static PTuple ofInternal(Map<TupleTag<?>, ?> valueMap) {
-    return new PTuple(valueMap);
-  }
-}