You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:35 UTC
[11/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
deleted file mode 100644
index 96629b1..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
+++ /dev/null
@@ -1,544 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.MapCoder;
-import com.google.cloud.dataflow.sdk.coders.SetCoder;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.util.state.StateInternals;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
-import com.google.cloud.dataflow.sdk.util.state.StateTag;
-import com.google.cloud.dataflow.sdk.util.state.StateTags;
-import com.google.cloud.dataflow.sdk.util.state.ValueState;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-/**
- * An {@link ActiveWindowSet} for merging {@link WindowFn} implementations.
- *
- * <p>The underlying notion of {@link MergingActiveWindowSet} is that of representing equivalence
- * classes of merged windows as a mapping from the merged "super-window" to a set of
- * <i>state address</i> windows in which some state has been persisted. The mapping need not
- * contain EPHEMERAL windows, because they are created and merged without any persistent state.
- * Each window must be a state address window for at most one window, so the mapping is
- * invertible.
- *
- * <p>The states of a non-expired window are treated as follows:
- *
- * <ul>
- * <li><b>NEW</b>: a NEW has an empty set of associated state address windows.</li>
- * <li><b>ACTIVE</b>: an ACTIVE window will be associated with some nonempty set of state
- * address windows. If the window has not merged, this will necessarily be the singleton set
- * containing just itself, but it is not required that an ACTIVE window be amongst its
- * state address windows.</li>
- * <li><b>MERGED</b>: a MERGED window will be in the set of associated windows for some
- * other window - that window is retrieved via {@link #representative} (this reverse
- * association is implemented in O(1) time).</li>
- * <li><b>EPHEMERAL</b>: EPHEMERAL windows are not persisted but are tracked transiently;
- * an EPHEMERAL window must be registered with this {@link ActiveWindowSet} by a call
- * to {@link #recordMerge} prior to any request for a {@link #representative}.</li>
- * </ul>
- *
- * <p>To illustrate why an ACTIVE window need not be amongst its own state address windows,
- * consider two active windows W1 and W2 that are merged to form W12. Further writes may be
- * applied to either of W1 or W2, since a read of W12 implies reading both of W12 and merging
- * their results. Hence W12 need not have state directly associated with it.
- */
-public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWindowSet<W> {
- private final WindowFn<Object, W> windowFn;
- private final Map<W, Set<W>> activeWindowToStateAddressWindows;
-
- /**
- * As above, but only for EPHEMERAL windows. Does not need to be persisted.
- */
- private final Map<W, Set<W>> activeWindowToEphemeralWindows;
-
- /**
- * A map from window to the ACTIVE window it has been merged into.
- *
- * <p>Does not need to be persisted.
- *
- * <ul>
- * <li>Key window may be ACTIVE, MERGED or EPHEMERAL.
- * <li>ACTIVE windows map to themselves.
- * <li>If W1 maps to W2 then W2 is in {@link #activeWindowToStateAddressWindows}.
- * <li>If W1 = W2 then W1 is ACTIVE. If W1 is in the state address window set for W2 then W1 is
- * MERGED. Otherwise W1 is EPHEMERAL.
- * </ul>
- */
- private final Map<W, W> windowToActiveWindow;
-
- /**
- * Deep clone of {@link #activeWindowToStateAddressWindows} as of last commit.
- *
- * <p>Used to avoid writing to state if no changes have been made during the work unit.
- */
- private final Map<W, Set<W>> originalActiveWindowToStateAddressWindows;
-
- /**
- * Handle representing our state in the backend.
- */
- private final ValueState<Map<W, Set<W>>> valueState;
-
- public MergingActiveWindowSet(WindowFn<Object, W> windowFn, StateInternals<?> state) {
- this.windowFn = windowFn;
-
- StateTag<Object, ValueState<Map<W, Set<W>>>> mergeTreeAddr =
- StateTags.makeSystemTagInternal(StateTags.value(
- "tree", MapCoder.of(windowFn.windowCoder(), SetCoder.of(windowFn.windowCoder()))));
- valueState = state.state(StateNamespaces.global(), mergeTreeAddr);
- // Little use trying to prefetch this state since the ReduceFnRunner is stymied until it is
- // available.
- activeWindowToStateAddressWindows = emptyIfNull(valueState.read());
- activeWindowToEphemeralWindows = new HashMap<>();
- originalActiveWindowToStateAddressWindows = deepCopy(activeWindowToStateAddressWindows);
- windowToActiveWindow = invert(activeWindowToStateAddressWindows);
- }
-
- @Override
- public void removeEphemeralWindows() {
- for (Map.Entry<W, Set<W>> entry : activeWindowToEphemeralWindows.entrySet()) {
- for (W ephemeral : entry.getValue()) {
- windowToActiveWindow.remove(ephemeral);
- }
- }
- activeWindowToEphemeralWindows.clear();
- }
-
- @Override
- public void persist() {
- if (activeWindowToStateAddressWindows.isEmpty()) {
- // Force all persistent state to disappear.
- valueState.clear();
- return;
- }
- if (activeWindowToStateAddressWindows.equals(originalActiveWindowToStateAddressWindows)) {
- // No change.
- return;
- }
- // All NEW windows must have been accounted for.
- for (Map.Entry<W, Set<W>> entry : activeWindowToStateAddressWindows.entrySet()) {
- Preconditions.checkState(
- !entry.getValue().isEmpty(), "Cannot persist NEW window %s", entry.getKey());
- }
- // Should be no EPHEMERAL windows.
- Preconditions.checkState(
- activeWindowToEphemeralWindows.isEmpty(), "Unexpected EPHEMERAL windows before persist");
-
- valueState.write(activeWindowToStateAddressWindows);
- // No need to update originalActiveWindowToStateAddressWindows since this object is about to
- // become garbage.
- }
-
- @Override
- @Nullable
- public W representative(W window) {
- return windowToActiveWindow.get(window);
- }
-
- @Override
- public Set<W> getActiveWindows() {
- return activeWindowToStateAddressWindows.keySet();
- }
-
- @Override
- public boolean isActive(W window) {
- return activeWindowToStateAddressWindows.containsKey(window);
- }
-
- @Override
- public void addNew(W window) {
- if (!windowToActiveWindow.containsKey(window)) {
- activeWindowToStateAddressWindows.put(window, new LinkedHashSet<W>());
- }
- }
-
- @Override
- public void addActive(W window) {
- if (!windowToActiveWindow.containsKey(window)) {
- Set<W> stateAddressWindows = new LinkedHashSet<>();
- stateAddressWindows.add(window);
- activeWindowToStateAddressWindows.put(window, stateAddressWindows);
- windowToActiveWindow.put(window, window);
- }
- }
-
- @Override
- public void remove(W window) {
- Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
- if (stateAddressWindows == null) {
- // Window is no longer active.
- return;
- }
- for (W stateAddressWindow : stateAddressWindows) {
- windowToActiveWindow.remove(stateAddressWindow);
- }
- activeWindowToStateAddressWindows.remove(window);
- Set<W> ephemeralWindows = activeWindowToEphemeralWindows.get(window);
- if (ephemeralWindows != null) {
- for (W ephemeralWindow : ephemeralWindows) {
- windowToActiveWindow.remove(ephemeralWindow);
- }
- activeWindowToEphemeralWindows.remove(window);
- }
- windowToActiveWindow.remove(window);
- }
-
- private class MergeContextImpl extends WindowFn<Object, W>.MergeContext {
- private MergeCallback<W> mergeCallback;
- private final List<Collection<W>> allToBeMerged;
- private final List<Collection<W>> allActiveToBeMerged;
- private final List<W> allMergeResults;
- private final Set<W> seen;
-
- public MergeContextImpl(MergeCallback<W> mergeCallback) {
- windowFn.super();
- this.mergeCallback = mergeCallback;
- allToBeMerged = new ArrayList<>();
- allActiveToBeMerged = new ArrayList<>();
- allMergeResults = new ArrayList<>();
- seen = new HashSet<>();
- }
-
- @Override
- public Collection<W> windows() {
- return activeWindowToStateAddressWindows.keySet();
- }
-
- @Override
- public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
- // The arguments have come from userland.
- Preconditions.checkNotNull(toBeMerged);
- Preconditions.checkNotNull(mergeResult);
- List<W> copyOfToBeMerged = new ArrayList<>(toBeMerged.size());
- List<W> activeToBeMerged = new ArrayList<>(toBeMerged.size());
- boolean includesMergeResult = false;
- for (W window : toBeMerged) {
- Preconditions.checkNotNull(window);
- Preconditions.checkState(
- isActive(window), "Expecting merge window %s to be active", window);
- if (window.equals(mergeResult)) {
- includesMergeResult = true;
- }
- boolean notDup = seen.add(window);
- Preconditions.checkState(
- notDup, "Expecting merge window %s to appear in at most one merge set", window);
- copyOfToBeMerged.add(window);
- if (!activeWindowToStateAddressWindows.get(window).isEmpty()) {
- activeToBeMerged.add(window);
- }
- }
- if (!includesMergeResult) {
- Preconditions.checkState(
- !isActive(mergeResult), "Expecting result window %s to be new", mergeResult);
- }
- allToBeMerged.add(copyOfToBeMerged);
- allActiveToBeMerged.add(activeToBeMerged);
- allMergeResults.add(mergeResult);
- }
-
- public void recordMerges() throws Exception {
- for (int i = 0; i < allToBeMerged.size(); i++) {
- mergeCallback.prefetchOnMerge(
- allToBeMerged.get(i), allActiveToBeMerged.get(i), allMergeResults.get(i));
- }
- for (int i = 0; i < allToBeMerged.size(); i++) {
- mergeCallback.onMerge(
- allToBeMerged.get(i), allActiveToBeMerged.get(i), allMergeResults.get(i));
- recordMerge(allToBeMerged.get(i), allMergeResults.get(i));
- }
- allToBeMerged.clear();
- allActiveToBeMerged.clear();
- allMergeResults.clear();
- seen.clear();
- }
- }
-
- @Override
- public void merge(MergeCallback<W> mergeCallback) throws Exception {
- MergeContextImpl context = new MergeContextImpl(mergeCallback);
-
- // See what the window function does with the NEW and already ACTIVE windows.
- // Entering userland.
- windowFn.mergeWindows(context);
-
- // Actually do the merging and invoke the callbacks.
- context.recordMerges();
-
- // Any remaining NEW windows should become implicitly ACTIVE.
- for (Map.Entry<W, Set<W>> entry : activeWindowToStateAddressWindows.entrySet()) {
- if (entry.getValue().isEmpty()) {
- // This window was NEW but since it survived merging must now become ACTIVE.
- W window = entry.getKey();
- entry.getValue().add(window);
- windowToActiveWindow.put(window, window);
- }
- }
- }
-
- /**
- * A {@link WindowFn#mergeWindows} call has determined that {@code toBeMerged} (which must
- * all be ACTIVE}) should be considered equivalent to {@code activeWindow} (which is either a
- * member of {@code toBeMerged} or is a new window). Make the corresponding change in
- * the active window set.
- */
- private void recordMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {
- Set<W> newStateAddressWindows = new LinkedHashSet<>();
- Set<W> existingStateAddressWindows = activeWindowToStateAddressWindows.get(mergeResult);
- if (existingStateAddressWindows != null) {
- // Preserve all the existing state address windows for mergeResult.
- newStateAddressWindows.addAll(existingStateAddressWindows);
- }
-
- Set<W> newEphemeralWindows = new HashSet<>();
- Set<W> existingEphemeralWindows = activeWindowToEphemeralWindows.get(mergeResult);
- if (existingEphemeralWindows != null) {
- // Preserve all the existing EPHEMERAL windows for meregResult.
- newEphemeralWindows.addAll(existingEphemeralWindows);
- }
-
- for (W other : toBeMerged) {
- Set<W> otherStateAddressWindows = activeWindowToStateAddressWindows.get(other);
- Preconditions.checkState(otherStateAddressWindows != null, "Window %s is not ACTIVE", other);
-
- for (W otherStateAddressWindow : otherStateAddressWindows) {
- // Since otherTarget equiv other AND other equiv mergeResult
- // THEN otherTarget equiv mergeResult.
- newStateAddressWindows.add(otherStateAddressWindow);
- windowToActiveWindow.put(otherStateAddressWindow, mergeResult);
- }
- activeWindowToStateAddressWindows.remove(other);
-
- Set<W> otherEphemeralWindows = activeWindowToEphemeralWindows.get(other);
- if (otherEphemeralWindows != null) {
- for (W otherEphemeral : otherEphemeralWindows) {
- // Since otherEphemeral equiv other AND other equiv mergeResult
- // THEN otherEphemeral equiv mergeResult.
- newEphemeralWindows.add(otherEphemeral);
- windowToActiveWindow.put(otherEphemeral, mergeResult);
- }
- }
- activeWindowToEphemeralWindows.remove(other);
-
- // Now other equiv mergeResult.
- if (otherStateAddressWindows.contains(other)) {
- // Other was ACTIVE and is now known to be MERGED.
- } else if (otherStateAddressWindows.isEmpty()) {
- // Other was NEW thus has no state. It is now EPHEMERAL.
- newEphemeralWindows.add(other);
- } else if (other.equals(mergeResult)) {
- // Other was ACTIVE, was never used to store elements, but is still ACTIVE.
- // Leave it as active.
- } else {
- // Other was ACTIVE, was never used to store element, as is no longer considered ACTIVE.
- // It is now EPHEMERAL.
- newEphemeralWindows.add(other);
- }
- windowToActiveWindow.put(other, mergeResult);
- }
-
- if (newStateAddressWindows.isEmpty()) {
- // If stateAddressWindows is empty then toBeMerged must have only contained EPHEMERAL windows.
- // Promote mergeResult to be active now.
- newStateAddressWindows.add(mergeResult);
- }
- windowToActiveWindow.put(mergeResult, mergeResult);
-
- activeWindowToStateAddressWindows.put(mergeResult, newStateAddressWindows);
- if (!newEphemeralWindows.isEmpty()) {
- activeWindowToEphemeralWindows.put(mergeResult, newEphemeralWindows);
- }
-
- merged(mergeResult);
- }
-
- @Override
- public void merged(W window) {
- Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
- Preconditions.checkState(stateAddressWindows != null, "Window %s is not ACTIVE", window);
- W first = Iterables.getFirst(stateAddressWindows, null);
- stateAddressWindows.clear();
- stateAddressWindows.add(first);
- }
-
- /**
- * Return the state address windows for ACTIVE {@code window} from which all state associated
- * should
- * be read and merged.
- */
- @Override
- public Set<W> readStateAddresses(W window) {
- Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
- Preconditions.checkState(stateAddressWindows != null, "Window %s is not ACTIVE", window);
- return stateAddressWindows;
- }
-
- /**
- * Return the state address window of ACTIVE {@code window} into which all new state should be
- * written.
- */
- @Override
- public W writeStateAddress(W window) {
- Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
- Preconditions.checkState(stateAddressWindows != null, "Window %s is not ACTIVE", window);
- W result = Iterables.getFirst(stateAddressWindows, null);
- Preconditions.checkState(result != null, "Window %s is still NEW", window);
- return result;
- }
-
- @Override
- public W mergedWriteStateAddress(Collection<W> toBeMerged, W mergeResult) {
- Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(mergeResult);
- if (stateAddressWindows != null && !stateAddressWindows.isEmpty()) {
- return Iterables.getFirst(stateAddressWindows, null);
- }
- for (W mergedWindow : toBeMerged) {
- stateAddressWindows = activeWindowToStateAddressWindows.get(mergedWindow);
- if (stateAddressWindows != null && !stateAddressWindows.isEmpty()) {
- return Iterables.getFirst(stateAddressWindows, null);
- }
- }
- return mergeResult;
- }
-
- @VisibleForTesting
- public void checkInvariants() {
- Set<W> knownStateAddressWindows = new HashSet<>();
- for (Map.Entry<W, Set<W>> entry : activeWindowToStateAddressWindows.entrySet()) {
- W active = entry.getKey();
- Preconditions.checkState(!entry.getValue().isEmpty(),
- "Unexpected empty state address window set for ACTIVE window %s", active);
- for (W stateAddressWindow : entry.getValue()) {
- Preconditions.checkState(knownStateAddressWindows.add(stateAddressWindow),
- "%s is in more than one state address window set", stateAddressWindow);
- Preconditions.checkState(active.equals(windowToActiveWindow.get(stateAddressWindow)),
- "%s should have %s as its ACTIVE window", stateAddressWindow, active);
- }
- }
- for (Map.Entry<W, Set<W>> entry : activeWindowToEphemeralWindows.entrySet()) {
- W active = entry.getKey();
- Preconditions.checkState(activeWindowToStateAddressWindows.containsKey(active),
- "%s must be ACTIVE window", active);
- Preconditions.checkState(
- !entry.getValue().isEmpty(), "Unexpected empty EPHEMERAL set for %s", active);
- for (W ephemeralWindow : entry.getValue()) {
- Preconditions.checkState(knownStateAddressWindows.add(ephemeralWindow),
- "%s is EPHEMERAL/state address of more than one ACTIVE window", ephemeralWindow);
- Preconditions.checkState(active.equals(windowToActiveWindow.get(ephemeralWindow)),
- "%s should have %s as its ACTIVE window", ephemeralWindow, active);
- }
- }
- for (Map.Entry<W, W> entry : windowToActiveWindow.entrySet()) {
- Preconditions.checkState(activeWindowToStateAddressWindows.containsKey(entry.getValue()),
- "%s should be ACTIVE since representative for %s", entry.getValue(), entry.getKey());
- }
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("MergingActiveWindowSet {\n");
- for (Map.Entry<W, Set<W>> entry : activeWindowToStateAddressWindows.entrySet()) {
- W active = entry.getKey();
- Set<W> stateAddressWindows = entry.getValue();
- if (stateAddressWindows.isEmpty()) {
- sb.append(" NEW ");
- sb.append(active);
- sb.append('\n');
- } else {
- sb.append(" ACTIVE ");
- sb.append(active);
- sb.append(":\n");
- for (W stateAddressWindow : stateAddressWindows) {
- if (stateAddressWindow.equals(active)) {
- sb.append(" ACTIVE ");
- } else {
- sb.append(" MERGED ");
- }
- sb.append(stateAddressWindow);
- sb.append("\n");
- W active2 = windowToActiveWindow.get(stateAddressWindow);
- Preconditions.checkState(active2.equals(active));
- }
- Set<W> ephemeralWindows = activeWindowToEphemeralWindows.get(active);
- if (ephemeralWindows != null) {
- for (W ephemeralWindow : ephemeralWindows) {
- sb.append(" EPHEMERAL ");
- sb.append(ephemeralWindow);
- sb.append('\n');
- }
- }
- }
- }
- sb.append("}");
- return sb.toString();
- }
-
- // ======================================================================
-
- /**
- * Replace null {@code multimap} with empty map, and replace null entries in {@code multimap} with
- * empty sets.
- */
- private static <W> Map<W, Set<W>> emptyIfNull(@Nullable Map<W, Set<W>> multimap) {
- if (multimap == null) {
- return new HashMap<>();
- } else {
- for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) {
- if (entry.getValue() == null) {
- entry.setValue(new LinkedHashSet<W>());
- }
- }
- return multimap;
- }
- }
-
- /** Return a deep copy of {@code multimap}. */
- private static <W> Map<W, Set<W>> deepCopy(Map<W, Set<W>> multimap) {
- Map<W, Set<W>> newMultimap = new HashMap<>();
- for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) {
- newMultimap.put(entry.getKey(), new LinkedHashSet<>(entry.getValue()));
- }
- return newMultimap;
- }
-
- /** Return inversion of {@code multimap}, which must be invertible. */
- private static <W> Map<W, W> invert(Map<W, Set<W>> multimap) {
- Map<W, W> result = new HashMap<>();
- for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) {
- W active = entry.getKey();
- for (W target : entry.getValue()) {
- W previous = result.put(target, active);
- Preconditions.checkState(previous == null,
- "Window %s has both %s and %s as representatives", target, previous, active);
- }
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MimeTypes.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MimeTypes.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MimeTypes.java
deleted file mode 100644
index 489d183..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MimeTypes.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-/** Constants representing various mime types. */
-public class MimeTypes {
- public static final String TEXT = "text/plain";
- public static final String BINARY = "application/octet-stream";
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java
deleted file mode 100644
index d450187..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.cloud.dataflow.sdk.util.TimeUtil.fromCloudTime;
-
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Messages;
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.ListJobMessagesResponse;
-import com.google.cloud.dataflow.sdk.PipelineResult.State;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * A helper class for monitoring jobs submitted to the service.
- */
-public final class MonitoringUtil {
-
- private static final String GCLOUD_DATAFLOW_PREFIX = "gcloud alpha dataflow";
- private static final String ENDPOINT_OVERRIDE_ENV_VAR =
- "CLOUDSDK_API_ENDPOINT_OVERRIDES_DATAFLOW";
-
- private static final Map<String, State> DATAFLOW_STATE_TO_JOB_STATE =
- ImmutableMap
- .<String, State>builder()
- .put("JOB_STATE_UNKNOWN", State.UNKNOWN)
- .put("JOB_STATE_STOPPED", State.STOPPED)
- .put("JOB_STATE_RUNNING", State.RUNNING)
- .put("JOB_STATE_DONE", State.DONE)
- .put("JOB_STATE_FAILED", State.FAILED)
- .put("JOB_STATE_CANCELLED", State.CANCELLED)
- .put("JOB_STATE_UPDATED", State.UPDATED)
- .build();
-
- private String projectId;
- private Messages messagesClient;
-
- /**
- * An interface that can be used for defining callbacks to receive a list
- * of JobMessages containing monitoring information.
- */
- public interface JobMessagesHandler {
- /** Process the rows. */
- void process(List<JobMessage> messages);
- }
-
- /** A handler that prints monitoring messages to a stream. */
- public static class PrintHandler implements JobMessagesHandler {
- private PrintStream out;
-
- /**
- * Construct the handler.
- *
- * @param stream The stream to write the messages to.
- */
- public PrintHandler(PrintStream stream) {
- out = stream;
- }
-
- @Override
- public void process(List<JobMessage> messages) {
- for (JobMessage message : messages) {
- if (message.getMessageText() == null || message.getMessageText().isEmpty()) {
- continue;
- }
- String importanceString = null;
- if (message.getMessageImportance() == null) {
- continue;
- } else if (message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
- importanceString = "Error: ";
- } else if (message.getMessageImportance().equals("JOB_MESSAGE_WARNING")) {
- importanceString = "Warning: ";
- } else if (message.getMessageImportance().equals("JOB_MESSAGE_BASIC")) {
- importanceString = "Basic: ";
- } else if (message.getMessageImportance().equals("JOB_MESSAGE_DETAILED")) {
- importanceString = "Detail: ";
- } else {
- // TODO: Remove filtering here once getJobMessages supports minimum
- // importance.
- continue;
- }
- @Nullable Instant time = TimeUtil.fromCloudTime(message.getTime());
- if (time == null) {
- out.print("UNKNOWN TIMESTAMP: ");
- } else {
- out.print(time + ": ");
- }
- if (importanceString != null) {
- out.print(importanceString);
- }
- out.println(message.getMessageText());
- }
- out.flush();
- }
- }
-
- /** Construct a helper for monitoring. */
- public MonitoringUtil(String projectId, Dataflow dataflow) {
- this(projectId, dataflow.projects().jobs().messages());
- }
-
- // @VisibleForTesting
- MonitoringUtil(String projectId, Messages messagesClient) {
- this.projectId = projectId;
- this.messagesClient = messagesClient;
- }
-
- /**
- * Comparator for sorting rows in increasing order based on timestamp.
- */
- public static class TimeStampComparator implements Comparator<JobMessage> {
- @Override
- public int compare(JobMessage o1, JobMessage o2) {
- @Nullable Instant t1 = fromCloudTime(o1.getTime());
- if (t1 == null) {
- return -1;
- }
- @Nullable Instant t2 = fromCloudTime(o2.getTime());
- if (t2 == null) {
- return 1;
- }
- return t1.compareTo(t2);
- }
- }
-
- /**
- * Return job messages sorted in ascending order by timestamp.
- * @param jobId The id of the job to get the messages for.
- * @param startTimestampMs Return only those messages with a
- * timestamp greater than this value.
- * @return collection of messages
- * @throws IOException
- */
- public ArrayList<JobMessage> getJobMessages(
- String jobId, long startTimestampMs) throws IOException {
- // TODO: Allow filtering messages by importance
- Instant startTimestamp = new Instant(startTimestampMs);
- ArrayList<JobMessage> allMessages = new ArrayList<>();
- String pageToken = null;
- while (true) {
- Messages.List listRequest = messagesClient.list(projectId, jobId);
- if (pageToken != null) {
- listRequest.setPageToken(pageToken);
- }
- ListJobMessagesResponse response = listRequest.execute();
-
- if (response == null || response.getJobMessages() == null) {
- return allMessages;
- }
-
- for (JobMessage m : response.getJobMessages()) {
- @Nullable Instant timestamp = fromCloudTime(m.getTime());
- if (timestamp == null) {
- continue;
- }
- if (timestamp.isAfter(startTimestamp)) {
- allMessages.add(m);
- }
- }
-
- if (response.getNextPageToken() == null) {
- break;
- } else {
- pageToken = response.getNextPageToken();
- }
- }
-
- Collections.sort(allMessages, new TimeStampComparator());
- return allMessages;
- }
-
- public static String getJobMonitoringPageURL(String projectName, String jobId) {
- try {
- // Project name is allowed in place of the project id: the user will be redirected to a URL
- // that has the project name replaced with project id.
- return String.format(
- "https://console.developers.google.com/project/%s/dataflow/job/%s",
- URLEncoder.encode(projectName, "UTF-8"),
- URLEncoder.encode(jobId, "UTF-8"));
- } catch (UnsupportedEncodingException e) {
- // Should never happen.
- throw new AssertionError("UTF-8 encoding is not supported by the environment", e);
- }
- }
-
- public static String getGcloudCancelCommand(DataflowPipelineOptions options, String jobId) {
-
- // If using a different Dataflow API than default, prefix command with an API override.
- String dataflowApiOverridePrefix = "";
- String apiUrl = options.getDataflowClient().getBaseUrl();
- if (!apiUrl.equals(Dataflow.DEFAULT_BASE_URL)) {
- dataflowApiOverridePrefix = String.format("%s=%s ", ENDPOINT_OVERRIDE_ENV_VAR, apiUrl);
- }
-
- // Assemble cancel command from optional prefix and project/job parameters.
- return String.format("%s%s jobs --project=%s cancel %s",
- dataflowApiOverridePrefix, GCLOUD_DATAFLOW_PREFIX, options.getProject(), jobId);
- }
-
- public static State toState(String stateName) {
- return MoreObjects.firstNonNull(DATAFLOW_STATE_TO_JOB_STATE.get(stateName),
- State.UNKNOWN);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MutationDetector.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MutationDetector.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MutationDetector.java
deleted file mode 100644
index 51e65ab..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MutationDetector.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-/**
- * An object for detecting illegal mutations.
- *
- * <p>The {@link AutoCloseable} aspect of this interface allows use in a try-with-resources
- * style, where the implementing class may choose to perform a final mutation check upon
- * {@link #close()}.
- */
-public interface MutationDetector extends AutoCloseable {
- /**
- * @throws IllegalMutationException if illegal mutations are detected.
- */
- void verifyUnmodified();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MutationDetectors.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MutationDetectors.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MutationDetectors.java
deleted file mode 100644
index 412e3eb..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MutationDetectors.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.common.base.Throwables;
-
-import java.util.Arrays;
-import java.util.Objects;
-
-/**
- * Static methods for creating and working with {@link MutationDetector}.
- */
-public class MutationDetectors {
-
- private MutationDetectors() {}
-
- /**
- * Creates a new {@code MutationDetector} for the provided {@code value} that uses the provided
- * {@link Coder} to perform deep copies and comparisons by serializing and deserializing values.
- *
- * <p>It is permissible for {@code value} to be {@code null}. Since {@code null} is immutable,
- * the mutation check will always succeed.
- */
- public static <T> MutationDetector forValueWithCoder(T value, Coder<T> coder)
- throws CoderException {
- if (value == null) {
- return noopMutationDetector();
- } else {
- return new CodedValueMutationDetector<>(value, coder);
- }
- }
-
- /**
- * Creates a new {@code MutationDetector} that always succeeds.
- *
- * <p>This is useful, for example, for providing a very efficient mutation detector for a value
- * which is already immutable by design.
- */
- public static MutationDetector noopMutationDetector() {
- return new NoopMutationDetector();
- }
-
- /**
- * A {@link MutationDetector} for {@code null}, which is immutable.
- */
- private static class NoopMutationDetector implements MutationDetector {
-
- @Override
- public void verifyUnmodified() { }
-
- @Override
- public void close() { }
- }
-
- /**
- * Given a value of type {@code T} and a {@link Coder} for that type, provides facilities to save
- * check that the value has not changed.
- *
- * @param <T> the type of values checked for mutation
- */
- private static class CodedValueMutationDetector<T> implements MutationDetector {
-
- private final Coder<T> coder;
-
- /**
- * A saved pointer to an in-memory value provided upon construction, which we will check for
- * forbidden mutations.
- */
- private final T possiblyModifiedObject;
-
- /**
- * A saved encoded copy of the same value as {@link #possiblyModifiedObject}. Naturally, it
- * will not change if {@link #possiblyModifiedObject} is mutated.
- */
- private final byte[] encodedOriginalObject;
-
- /**
- * The object decoded from {@link #encodedOriginalObject}. It will be used during every call to
- * {@link #verifyUnmodified}, which could be called many times throughout the lifetime of this
- * {@link CodedValueMutationDetector}.
- */
- private final T clonedOriginalObject;
-
- /**
- * Create a mutation detector for the provided {@code value}, using the provided {@link Coder}
- * for cloning and checking serialized forms for equality.
- */
- public CodedValueMutationDetector(T value, Coder<T> coder) throws CoderException {
- this.coder = coder;
- this.possiblyModifiedObject = value;
- this.encodedOriginalObject = CoderUtils.encodeToByteArray(coder, value);
- this.clonedOriginalObject = CoderUtils.decodeFromByteArray(coder, encodedOriginalObject);
- }
-
- @Override
- public void verifyUnmodified() {
- try {
- verifyUnmodifiedThrowingCheckedExceptions();
- } catch (CoderException exn) {
- Throwables.propagate(exn);
- }
- }
-
- private void verifyUnmodifiedThrowingCheckedExceptions() throws CoderException {
- // If either object believes they are equal, we trust that and short-circuit deeper checks.
- if (Objects.equals(possiblyModifiedObject, clonedOriginalObject)
- || Objects.equals(clonedOriginalObject, possiblyModifiedObject)) {
- return;
- }
-
- // Since retainedObject is in general an instance of a subclass of T, when it is cloned to
- // clonedObject using a Coder<T>, the two will generally be equivalent viewed as a T, but in
- // general neither retainedObject.equals(clonedObject) nor clonedObject.equals(retainedObject)
- // will hold.
- //
- // For example, CoderUtils.clone(IterableCoder<Integer>, IterableSubclass<Integer>) will
- // produce an ArrayList<Integer> with the same contents as the IterableSubclass, but the
- // latter will quite reasonably not consider itself equivalent to an ArrayList (and vice
- // versa).
- //
- // To enable a reasonable comparison, we clone retainedObject again here, converting it to
- // the same sort of T that the Coder<T> output when it created clonedObject.
- T clonedPossiblyModifiedObject = CoderUtils.clone(coder, possiblyModifiedObject);
-
- // If deepEquals() then we trust the equals implementation.
- // This deliberately allows fields to escape this check.
- if (Objects.deepEquals(clonedPossiblyModifiedObject, clonedOriginalObject)) {
- return;
- }
-
- // If not deepEquals(), the class may just have a poor equals() implementation.
- // So we next try checking their serialized forms. We re-serialize instead of checking
- // encodedObject, because the Coder may treat it differently.
- //
- // For example, an unbounded Iterable will be encoded in an unbounded way, but decoded into an
- // ArrayList, which will then be re-encoded in a bounded format. So we really do need to
- // encode-decode-encode retainedObject.
- if (Arrays.equals(
- CoderUtils.encodeToByteArray(coder, clonedOriginalObject),
- CoderUtils.encodeToByteArray(coder, clonedPossiblyModifiedObject))) {
- return;
- }
-
- // If we got here, then they are not deepEquals() and do not have deepEquals() encodings.
- // Even if there is some conceptual sense in which the objects are equivalent, it has not
- // been adequately expressed in code.
- illegalMutation(clonedOriginalObject, clonedPossiblyModifiedObject);
- }
-
- private void illegalMutation(T previousValue, T newValue) throws CoderException {
- throw new IllegalMutationException(
- String.format("Value %s mutated illegally, new value was %s."
- + " Encoding was %s, now %s.",
- previousValue, newValue,
- CoderUtils.encodeToBase64(coder, previousValue),
- CoderUtils.encodeToBase64(coder, newValue)),
- previousValue, newValue);
- }
-
- @Override
- public void close() {
- verifyUnmodified();
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NonEmptyPanes.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NonEmptyPanes.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NonEmptyPanes.java
deleted file mode 100644
index 1270f01..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NonEmptyPanes.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode;
-import com.google.cloud.dataflow.sdk.util.state.AccumulatorCombiningState;
-import com.google.cloud.dataflow.sdk.util.state.MergingStateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.ReadableState;
-import com.google.cloud.dataflow.sdk.util.state.StateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.StateMerging;
-import com.google.cloud.dataflow.sdk.util.state.StateTag;
-import com.google.cloud.dataflow.sdk.util.state.StateTags;
-
-/**
- * Tracks which windows have non-empty panes. Specifically, which windows have new elements since
- * their last triggering.
- *
- * @param <W> The kind of windows being tracked.
- */
-public abstract class NonEmptyPanes<K, W extends BoundedWindow> {
-
- static <K, W extends BoundedWindow> NonEmptyPanes<K, W> create(
- WindowingStrategy<?, W> strategy, ReduceFn<K, ?, ?, W> reduceFn) {
- if (strategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES) {
- return new DiscardingModeNonEmptyPanes<>(reduceFn);
- } else {
- return new GeneralNonEmptyPanes<>();
- }
- }
-
- /**
- * Record that some content has been added to the window in {@code context}, and therefore the
- * current pane is not empty.
- */
- public abstract void recordContent(StateAccessor<K> context);
-
- /**
- * Record that the given pane is empty.
- */
- public abstract void clearPane(StateAccessor<K> state);
-
- /**
- * Return true if the current pane for the window in {@code context} is empty.
- */
- public abstract ReadableState<Boolean> isEmpty(StateAccessor<K> context);
-
- /**
- * Prefetch in preparation for merging.
- */
- public abstract void prefetchOnMerge(MergingStateAccessor<K, W> state);
-
- /**
- * Eagerly merge backing state.
- */
- public abstract void onMerge(MergingStateAccessor<K, W> context);
-
- /**
- * An implementation of {@code NonEmptyPanes} optimized for use with discarding mode. Uses the
- * presence of data in the accumulation buffer to record non-empty panes.
- */
- private static class DiscardingModeNonEmptyPanes<K, W extends BoundedWindow>
- extends NonEmptyPanes<K, W> {
-
- private ReduceFn<K, ?, ?, W> reduceFn;
-
- private DiscardingModeNonEmptyPanes(ReduceFn<K, ?, ?, W> reduceFn) {
- this.reduceFn = reduceFn;
- }
-
- @Override
- public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
- return reduceFn.isEmpty(state);
- }
-
- @Override
- public void recordContent(StateAccessor<K> state) {
- // Nothing to do -- the reduceFn is tracking contents
- }
-
- @Override
- public void clearPane(StateAccessor<K> state) {
- // Nothing to do -- the reduceFn is tracking contents
- }
-
- @Override
- public void prefetchOnMerge(MergingStateAccessor<K, W> state) {
- // Nothing to do -- the reduceFn is tracking contents
- }
-
- @Override
- public void onMerge(MergingStateAccessor<K, W> context) {
- // Nothing to do -- the reduceFn is tracking contents
- }
- }
-
- /**
- * An implementation of {@code NonEmptyPanes} for general use.
- */
- private static class GeneralNonEmptyPanes<K, W extends BoundedWindow>
- extends NonEmptyPanes<K, W> {
-
- private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
- PANE_ADDITIONS_TAG =
- StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
- "count", VarLongCoder.of(), new Sum.SumLongFn()));
-
- @Override
- public void recordContent(StateAccessor<K> state) {
- state.access(PANE_ADDITIONS_TAG).add(1L);
- }
-
- @Override
- public void clearPane(StateAccessor<K> state) {
- state.access(PANE_ADDITIONS_TAG).clear();
- }
-
- @Override
- public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
- return state.access(PANE_ADDITIONS_TAG).isEmpty();
- }
-
- @Override
- public void prefetchOnMerge(MergingStateAccessor<K, W> state) {
- StateMerging.prefetchCombiningValues(state, PANE_ADDITIONS_TAG);
- }
-
- @Override
- public void onMerge(MergingStateAccessor<K, W> context) {
- StateMerging.mergeCombiningValues(context, PANE_ADDITIONS_TAG);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NonMergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NonMergingActiveWindowSet.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NonMergingActiveWindowSet.java
deleted file mode 100644
index cb7f9b0..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NonMergingActiveWindowSet.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.common.collect.ImmutableSet;
-
-import java.util.Collection;
-import java.util.Set;
-
-/**
- * Implementation of {@link ActiveWindowSet} used with {@link WindowFn WindowFns} that don't support
- * merging.
- *
- * @param <W> the types of windows being managed
- */
-public class NonMergingActiveWindowSet<W extends BoundedWindow> implements ActiveWindowSet<W> {
- @Override
- public void removeEphemeralWindows() {}
-
- @Override
- public void persist() {}
-
- @Override
- public W representative(W window) {
- // Always represented by itself.
- return window;
- }
-
- @Override
- public Set<W> getActiveWindows() {
- // Only supported when merging.
- throw new java.lang.UnsupportedOperationException();
- }
-
- @Override
- public boolean isActive(W window) {
- // Windows should never disappear, since we don't support merging.
- return true;
- }
-
- @Override
- public void addNew(W window) {}
-
- @Override
- public void addActive(W window) {}
-
- @Override
- public void remove(W window) {}
-
- @Override
- public void merge(MergeCallback<W> mergeCallback) throws Exception {}
-
- @Override
- public void merged(W window) {}
-
- @Override
- public Set<W> readStateAddresses(W window) {
- return ImmutableSet.of(window);
- }
-
- @Override
- public W writeStateAddress(W window) {
- return window;
- }
-
- @Override
- public W mergedWriteStateAddress(Collection<W> toBeMerged, W mergeResult) {
- return mergeResult;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NoopCredentialFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NoopCredentialFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NoopCredentialFactory.java
deleted file mode 100644
index 9ef4c2e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NoopCredentialFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-
-import java.io.IOException;
-import java.security.GeneralSecurityException;
-
-/**
- * Construct an oauth credential to be used by the SDK and the SDK workers.
- * Always returns a null Credential object.
- */
-public class NoopCredentialFactory implements CredentialFactory {
- public static NoopCredentialFactory fromOptions(PipelineOptions options) {
- return new NoopCredentialFactory();
- }
-
- @Override
- public Credential getCredential() throws IOException, GeneralSecurityException {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NoopPathValidator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NoopPathValidator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NoopPathValidator.java
deleted file mode 100644
index 00abbb1..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NoopPathValidator.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-
-/**
- * Noop implementation of {@link PathValidator}. All paths are allowed and returned unchanged.
- */
-public class NoopPathValidator implements PathValidator {
-
- private NoopPathValidator() {
- }
-
- public static PathValidator fromOptions(
- @SuppressWarnings("unused") PipelineOptions options) {
- return new NoopPathValidator();
- }
-
- @Override
- public String validateInputFilePatternSupported(String filepattern) {
- return filepattern;
- }
-
- @Override
- public String validateOutputFilePrefixSupported(String filePrefix) {
- return filePrefix;
- }
-
- @Override
- public String verifyPath(String path) {
- return path;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NullSideInputReader.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NullSideInputReader.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NullSideInputReader.java
deleted file mode 100644
index 0fc2646..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/NullSideInputReader.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.common.collect.Sets;
-
-import java.util.Collections;
-import java.util.Set;
-
-/**
- * A {@link SideInputReader} representing a well-defined set of views, but not storing
- * any values for them. Used to check if a side input is present when the data itself
- * comes from elsewhere.
- */
-public class NullSideInputReader implements SideInputReader {
-
- private Set<PCollectionView<?>> views;
-
- public static NullSideInputReader empty() {
- return new NullSideInputReader(Collections.<PCollectionView<?>>emptySet());
- }
-
- public static NullSideInputReader of(Iterable<? extends PCollectionView<?>> views) {
- return new NullSideInputReader(views);
- }
-
- private NullSideInputReader(Iterable<? extends PCollectionView<?>> views) {
- this.views = Sets.newHashSet(views);
- }
-
- @Override
- public <T> T get(PCollectionView<T> view, BoundedWindow window) {
- throw new IllegalArgumentException("cannot call NullSideInputReader.get()");
- }
-
- @Override
- public boolean isEmpty() {
- return views.isEmpty();
- }
-
- @Override
- public <T> boolean contains(PCollectionView<T> view) {
- return views.contains(view);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/OutputReference.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/OutputReference.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/OutputReference.java
deleted file mode 100644
index 096c996..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/OutputReference.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.api.client.util.Preconditions.checkNotNull;
-
-import com.google.api.client.json.GenericJson;
-import com.google.api.client.util.Key;
-
-/**
- * A representation used by {@link com.google.api.services.dataflow.model.Step}s
- * to reference the output of other {@code Step}s.
- */
-public final class OutputReference extends GenericJson {
- @Key("@type")
- public final String type = "OutputReference";
-
- @Key("step_name")
- private final String stepName;
-
- @Key("output_name")
- private final String outputName;
-
- public OutputReference(String stepName, String outputName) {
- this.stepName = checkNotNull(stepName);
- this.outputName = checkNotNull(outputName);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PCollectionViewWindow.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PCollectionViewWindow.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PCollectionViewWindow.java
deleted file mode 100644
index 7cf636e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PCollectionViewWindow.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-
-import java.util.Objects;
-
-/**
- * A pair of a {@link PCollectionView} and a {@link BoundedWindow}, which can
- * be thought of as window "of" the view. This is a value class for use e.g.
- * as a compound cache key.
- *
- * @param <T> the type of the underlying PCollectionView
- */
-public final class PCollectionViewWindow<T> {
-
- private final PCollectionView<T> view;
- private final BoundedWindow window;
-
- private PCollectionViewWindow(PCollectionView<T> view, BoundedWindow window) {
- this.view = view;
- this.window = window;
- }
-
- public static <T> PCollectionViewWindow<T> of(PCollectionView<T> view, BoundedWindow window) {
- return new PCollectionViewWindow<>(view, window);
- }
-
- public PCollectionView<T> getView() {
- return view;
- }
-
- public BoundedWindow getWindow() {
- return window;
- }
-
- @Override
- public boolean equals(Object otherObject) {
- if (!(otherObject instanceof PCollectionViewWindow)) {
- return false;
- }
- @SuppressWarnings("unchecked")
- PCollectionViewWindow<T> other = (PCollectionViewWindow<T>) otherObject;
- return getView().equals(other.getView()) && getWindow().equals(other.getWindow());
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getView(), getWindow());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PCollectionViews.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PCollectionViews.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PCollectionViews.java
deleted file mode 100644
index 7e73547..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PCollectionViews.java
+++ /dev/null
@@ -1,426 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.IterableCoder;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PValueBase;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.base.Function;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Objects;
-
-import javax.annotation.Nullable;
-
-/**
- * Implementations of {@link PCollectionView} shared across the SDK.
- *
- * <p>For internal use only, subject to change.
- */
-public class PCollectionViews {
-
- /**
- * Returns a {@code PCollectionView<T>} capable of processing elements encoded using the provided
- * {@link Coder} and windowed using the provided * {@link WindowingStrategy}.
- *
- * <p>If {@code hasDefault} is {@code true}, then the view will take on the value
- * {@code defaultValue} for any empty windows.
- */
- public static <T, W extends BoundedWindow> PCollectionView<T> singletonView(
- Pipeline pipeline,
- WindowingStrategy<?, W> windowingStrategy,
- boolean hasDefault,
- T defaultValue,
- Coder<T> valueCoder) {
- return new SingletonPCollectionView<>(
- pipeline, windowingStrategy, hasDefault, defaultValue, valueCoder);
- }
-
- /**
- * Returns a {@code PCollectionView<Iterable<T>>} capable of processing elements encoded using the
- * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
- */
- public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterableView(
- Pipeline pipeline,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<T> valueCoder) {
- return new IterablePCollectionView<>(pipeline, windowingStrategy, valueCoder);
- }
-
- /**
- * Returns a {@code PCollectionView<List<T>>} capable of processing elements encoded using the
- * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
- */
- public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
- Pipeline pipeline,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<T> valueCoder) {
- return new ListPCollectionView<>(pipeline, windowingStrategy, valueCoder);
- }
-
- /**
- * Returns a {@code PCollectionView<Map<K, V>>} capable of processing elements encoded using the
- * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
- */
- public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, V>> mapView(
- Pipeline pipeline,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<KV<K, V>> valueCoder) {
-
- return new MapPCollectionView<K, V, W>(pipeline, windowingStrategy, valueCoder);
- }
-
- /**
- * Returns a {@code PCollectionView<Map<K, Iterable<V>>>} capable of processing elements encoded
- * using the provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
- */
- public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>> multimapView(
- Pipeline pipeline,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<KV<K, V>> valueCoder) {
- return new MultimapPCollectionView<K, V, W>(pipeline, windowingStrategy, valueCoder);
- }
-
- /**
- * Implementation of conversion of singleton {@code Iterable<WindowedValue<T>>} to {@code T}.
- *
- * <p>For internal use only.
- *
- * <p>Instantiate via {@link PCollectionViews#singletonView}.
- */
- public static class SingletonPCollectionView<T, W extends BoundedWindow>
- extends PCollectionViewBase<T, T, W> {
- @Nullable private byte[] encodedDefaultValue;
- @Nullable private transient T defaultValue;
- @Nullable private Coder<T> valueCoder;
- private boolean hasDefault;
-
- private SingletonPCollectionView(
- Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy,
- boolean hasDefault, T defaultValue, Coder<T> valueCoder) {
- super(pipeline, windowingStrategy, valueCoder);
- this.hasDefault = hasDefault;
- this.defaultValue = defaultValue;
- this.valueCoder = valueCoder;
- if (hasDefault) {
- try {
- this.encodedDefaultValue = CoderUtils.encodeToByteArray(valueCoder, defaultValue);
- } catch (IOException e) {
- throw new RuntimeException("Unexpected IOException: ", e);
- }
- }
- }
-
- /**
- * Returns the default value that was specified.
- *
- * <p>For internal use only.
- *
- * @throws NoSuchElementException if no default was specified.
- */
- public T getDefaultValue() {
- if (!hasDefault) {
- throw new NoSuchElementException("Empty PCollection accessed as a singleton view.");
- }
- // Lazily decode the default value once
- synchronized (this) {
- if (encodedDefaultValue != null) {
- try {
- defaultValue = CoderUtils.decodeFromByteArray(valueCoder, encodedDefaultValue);
- encodedDefaultValue = null;
- } catch (IOException e) {
- throw new RuntimeException("Unexpected IOException: ", e);
- }
- }
- }
- return defaultValue;
- }
-
- @Override
- protected T fromElements(Iterable<WindowedValue<T>> contents) {
- try {
- return Iterables.getOnlyElement(contents).getValue();
- } catch (NoSuchElementException exc) {
- return getDefaultValue();
- } catch (IllegalArgumentException exc) {
- throw new IllegalArgumentException(
- "PCollection with more than one element "
- + "accessed as a singleton view.");
- }
- }
- }
-
- /**
- * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code Iterable<T>}.
- *
- * <p>For internal use only.
- *
- * <p>Instantiate via {@link PCollectionViews#iterableView}.
- */
- public static class IterablePCollectionView<T, W extends BoundedWindow>
- extends PCollectionViewBase<T, Iterable<T>, W> {
- private IterablePCollectionView(
- Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, Coder<T> valueCoder) {
- super(pipeline, windowingStrategy, valueCoder);
- }
-
- @Override
- protected Iterable<T> fromElements(Iterable<WindowedValue<T>> contents) {
- return Iterables.unmodifiableIterable(
- Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
- @SuppressWarnings("unchecked")
- @Override
- public T apply(WindowedValue<T> input) {
- return input.getValue();
- }
- }));
- }
- }
-
- /**
- * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code List<T>}.
- *
- * <p>For internal use only.
- *
- * <p>Instantiate via {@link PCollectionViews#listView}.
- */
- public static class ListPCollectionView<T, W extends BoundedWindow>
- extends PCollectionViewBase<T, List<T>, W> {
- private ListPCollectionView(
- Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, Coder<T> valueCoder) {
- super(pipeline, windowingStrategy, valueCoder);
- }
-
- @Override
- protected List<T> fromElements(Iterable<WindowedValue<T>> contents) {
- return ImmutableList.copyOf(
- Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
- @SuppressWarnings("unchecked")
- @Override
- public T apply(WindowedValue<T> input) {
- return input.getValue();
- }
- }));
- }
- }
-
- /**
- * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>>}
- * to {@code Map<K, Iterable<V>>}.
- *
- * <p>For internal use only.
- */
- public static class MultimapPCollectionView<K, V, W extends BoundedWindow>
- extends PCollectionViewBase<KV<K, V>, Map<K, Iterable<V>>, W> {
- private MultimapPCollectionView(
- Pipeline pipeline,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<KV<K, V>> valueCoder) {
- super(pipeline, windowingStrategy, valueCoder);
- }
-
- @Override
- protected Map<K, Iterable<V>> fromElements(Iterable<WindowedValue<KV<K, V>>> elements) {
- Multimap<K, V> multimap = HashMultimap.create();
- for (WindowedValue<KV<K, V>> elem : elements) {
- KV<K, V> kv = elem.getValue();
- multimap.put(kv.getKey(), kv.getValue());
- }
- // Safe covariant cast that Java cannot express without rawtypes, even with unchecked casts
- @SuppressWarnings({"unchecked", "rawtypes"})
- Map<K, Iterable<V>> resultMap = (Map) multimap.asMap();
- return Collections.unmodifiableMap(resultMap);
- }
- }
-
- /**
- * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>} with
- * one value per key to {@code Map<K, V>}.
- *
- * <p>For internal use only.
- */
- public static class MapPCollectionView<K, V, W extends BoundedWindow>
- extends PCollectionViewBase<KV<K, V>, Map<K, V>, W> {
- private MapPCollectionView(
- Pipeline pipeline,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<KV<K, V>> valueCoder) {
- super(pipeline, windowingStrategy, valueCoder);
- }
-
- /**
- * Input iterable must actually be {@code Iterable<WindowedValue<KV<K, V>>>}.
- */
- @Override
- protected Map<K, V> fromElements(Iterable<WindowedValue<KV<K, V>>> elements) {
- Map<K, V> map = new HashMap<>();
- for (WindowedValue<KV<K, V>> elem : elements) {
- KV<K, V> kv = elem.getValue();
- if (map.containsKey(kv.getKey())) {
- throw new IllegalArgumentException("Duplicate values for " + kv.getKey());
- }
- map.put(kv.getKey(), kv.getValue());
- }
- return Collections.unmodifiableMap(map);
- }
- }
-
- /**
- * A base class for {@link PCollectionView} implementations, with additional type parameters
- * that are not visible at pipeline assembly time when the view is used as a side input.
- */
- private abstract static class PCollectionViewBase<ElemT, ViewT, W extends BoundedWindow>
- extends PValueBase
- implements PCollectionView<ViewT> {
- /** A unique tag for the view, typed according to the elements underlying the view. */
- private TupleTag<Iterable<WindowedValue<ElemT>>> tag;
-
- /** The windowing strategy for the PCollection underlying the view. */
- private WindowingStrategy<?, W> windowingStrategy;
-
- /** The coder for the elements underlying the view. */
- private Coder<Iterable<WindowedValue<ElemT>>> coder;
-
- /**
- * Implement this to complete the implementation. It is a conversion function from
- * all of the elements of the underlying {@link PCollection} to the value of the view.
- */
- protected abstract ViewT fromElements(Iterable<WindowedValue<ElemT>> elements);
-
- /**
- * Call this constructor to initialize the fields for which this base class provides
- * boilerplate accessors.
- */
- protected PCollectionViewBase(
- Pipeline pipeline,
- TupleTag<Iterable<WindowedValue<ElemT>>> tag,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<ElemT> valueCoder) {
- super(pipeline);
- if (windowingStrategy.getWindowFn() instanceof InvalidWindows) {
- throw new IllegalArgumentException("WindowFn of PCollectionView cannot be InvalidWindows");
- }
- this.tag = tag;
- this.windowingStrategy = windowingStrategy;
- this.coder =
- IterableCoder.of(WindowedValue.getFullCoder(
- valueCoder, windowingStrategy.getWindowFn().windowCoder()));
- }
-
- /**
- * Call this constructor to initialize the fields for which this base class provides
- * boilerplate accessors, with an auto-generated tag.
- */
- protected PCollectionViewBase(
- Pipeline pipeline,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<ElemT> valueCoder) {
- this(pipeline, new TupleTag<Iterable<WindowedValue<ElemT>>>(), windowingStrategy, valueCoder);
- }
-
- /**
- * For serialization only. Do not use directly. Subclasses should call from their own
- * protected no-argument constructor.
- */
- @SuppressWarnings("unused") // used for serialization
- protected PCollectionViewBase() {
- super();
- }
-
- @Override
- public ViewT fromIterableInternal(Iterable<WindowedValue<?>> elements) {
- // Safe cast: it is required that the rest of the SDK maintain the invariant
- // that a PCollectionView is only provided an iterable for the elements of an
- // appropriately typed PCollection.
- @SuppressWarnings({"rawtypes", "unchecked"})
- Iterable<WindowedValue<ElemT>> typedElements = (Iterable) elements;
- return fromElements(typedElements);
- }
-
- /**
- * Returns a unique {@link TupleTag} identifying this {@link PCollectionView}.
- *
- * <p>For internal use only by runner implementors.
- */
- @Override
- public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() {
- // Safe cast: It is required that the rest of the SDK maintain the invariant that
- // this tag is only used to access the contents of an appropriately typed underlying
- // PCollection
- @SuppressWarnings({"rawtypes", "unchecked"})
- TupleTag<Iterable<WindowedValue<?>>> untypedTag = (TupleTag) tag;
- return untypedTag;
- }
-
- /**
- * Returns the {@link WindowingStrategy} of this {@link PCollectionView}, which should
- * be that of the underlying {@link PCollection}.
- *
- * <p>For internal use only by runner implementors.
- */
- @Override
- public WindowingStrategy<?, ?> getWindowingStrategyInternal() {
- return windowingStrategy;
- }
-
- @Override
- public Coder<Iterable<WindowedValue<?>>> getCoderInternal() {
- // Safe cast: It is required that the rest of the SDK only use this untyped coder
- // for the elements of an appropriately typed underlying PCollection.
- @SuppressWarnings({"rawtypes", "unchecked"})
- Coder<Iterable<WindowedValue<?>>> untypedCoder = (Coder) coder;
- return untypedCoder;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(tag);
- }
-
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof PCollectionView) || other == null) {
- return false;
- }
- @SuppressWarnings("unchecked")
- PCollectionView<?> otherView = (PCollectionView<?>) other;
- return tag.equals(otherView.getTagInternal());
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this).add("tag", tag).toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PTuple.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PTuple.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PTuple.java
deleted file mode 100644
index 5b87b5c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PTuple.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-/**
- * A {@code PTuple} is an immutable tuple of
- * heterogeneously-typed values, "keyed" by {@link TupleTag}s.
- *
- * <p>PTuples can be created and accessed like follows:
- * <pre> {@code
- * String v1 = ...;
- * Integer v2 = ...;
- * Iterable<String> v3 = ...;
- *
- * // Create TupleTags for each of the values to put in the
- * // PTuple (the type of the TupleTag enables tracking the
- * // static type of each of the values in the PTuple):
- * TupleTag<String> tag1 = new TupleTag<>();
- * TupleTag<Integer> tag2 = new TupleTag<>();
- * TupleTag<Iterable<String>> tag3 = new TupleTag<>();
- *
- * // Create a PTuple with three values:
- * PTuple povs =
- * PTuple.of(tag1, v1)
- * .and(tag2, v2)
- * .and(tag3, v3);
- *
- * // Create an empty PTuple:
- * Pipeline p = ...;
- * PTuple povs2 = PTuple.empty(p);
- *
- * // Get values out of a PTuple, using the same tags
- * // that were used to put them in:
- * Integer vX = povs.get(tag2);
- * String vY = povs.get(tag1);
- * Iterable<String> vZ = povs.get(tag3);
- *
- * // Get a map of all values in a PTuple:
- * Map<TupleTag<?>, ?> allVs = povs.getAll();
- * } </pre>
- */
-public class PTuple {
- /**
- * Returns an empty PTuple.
- *
- * <p>Longer PTuples can be created by calling
- * {@link #and} on the result.
- */
- public static PTuple empty() {
- return new PTuple();
- }
-
- /**
- * Returns a singleton PTuple containing the given
- * value keyed by the given TupleTag.
- *
- * <p>Longer PTuples can be created by calling
- * {@link #and} on the result.
- */
- public static <V> PTuple of(TupleTag<V> tag, V value) {
- return empty().and(tag, value);
- }
-
- /**
- * Returns a new PTuple that has all the values and
- * tags of this PTuple plus the given value and tag.
- *
- * <p>The given TupleTag should not already be mapped to a
- * value in this PTuple.
- */
- public <V> PTuple and(TupleTag<V> tag, V value) {
- Map<TupleTag<?>, Object> newMap = new LinkedHashMap<TupleTag<?>, Object>();
- newMap.putAll(valueMap);
- newMap.put(tag, value);
- return new PTuple(newMap);
- }
-
- /**
- * Returns whether this PTuple contains a value with
- * the given tag.
- */
- public <V> boolean has(TupleTag<V> tag) {
- return valueMap.containsKey(tag);
- }
-
- /**
- * Returns true if this {@code PTuple} is empty.
- */
- public boolean isEmpty() {
- return valueMap.isEmpty();
- }
-
- /**
- * Returns the value with the given tag in this
- * PTuple. Throws IllegalArgumentException if there is no
- * such value, i.e., {@code !has(tag)}.
- */
- public <V> V get(TupleTag<V> tag) {
- if (!has(tag)) {
- throw new IllegalArgumentException(
- "TupleTag not found in this PTuple");
- }
- @SuppressWarnings("unchecked")
- V value = (V) valueMap.get(tag);
- return value;
- }
-
- /**
- * Returns an immutable Map from TupleTag to corresponding
- * value, for all the members of this PTuple.
- */
- public Map<TupleTag<?>, ?> getAll() {
- return valueMap;
- }
-
-
- /////////////////////////////////////////////////////////////////////////////
- // Internal details below here.
-
- private final Map<TupleTag<?>, ?> valueMap;
-
- @SuppressWarnings("rawtypes")
- private PTuple() {
- this(new LinkedHashMap());
- }
-
- private PTuple(Map<TupleTag<?>, ?> valueMap) {
- this.valueMap = Collections.unmodifiableMap(valueMap);
- }
-
- /**
- * Returns a PTuple with each of the given tags mapping
- * to the corresponding value.
- *
- * <p>For internal use only.
- */
- public static PTuple ofInternal(Map<TupleTag<?>, ?> valueMap) {
- return new PTuple(valueMap);
- }
-}