You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:33 UTC
[09/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
deleted file mode 100644
index 2e2d1f6..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ /dev/null
@@ -1,843 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey.GroupByKeyOnly;
-import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo.Timing;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window.ClosingBehavior;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.util.ReduceFnContextFactory.OnTriggerCallbacks;
-import com.google.cloud.dataflow.sdk.util.ReduceFnContextFactory.StateStyle;
-import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode;
-import com.google.cloud.dataflow.sdk.util.state.ReadableState;
-import com.google.cloud.dataflow.sdk.util.state.StateInternals;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespaces.WindowNamespace;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-/**
- * Manages the execution of a {@link ReduceFn} after a {@link GroupByKeyOnly} has partitioned the
- * {@link PCollection} by key.
- *
- * <p>The {@link #onTrigger} relies on a {@link TriggerRunner} to manage the execution of
- * the triggering logic. The {@code ReduceFnRunner}s responsibilities are:
- *
- * <ul>
- * <li>Tracking the windows that are active (have buffered data) as elements arrive and
- * triggers are fired.
- * <li>Holding the watermark based on the timestamps of elements in a pane and releasing it
- * when the trigger fires.
- * <li>Calling the appropriate callbacks on {@link ReduceFn} based on trigger execution, timer
- * firings, etc, and providing appropriate contexts to the {@link ReduceFn} for actions
- * such as output.
- * <li>Scheduling garbage collection of state associated with a specific window, and making that
- * happen when the appropriate timer fires.
- * </ul>
- *
- * @param <K> The type of key being processed.
- * @param <InputT> The type of values associated with the key.
- * @param <OutputT> The output type that will be produced for each key.
- * @param <W> The type of windows this operates on.
- */
-public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
-
- /**
- * The {@link ReduceFnRunner} depends on most aspects of the {@link WindowingStrategy}.
- *
- * <ul>
- * <li>It runs the trigger from the {@link WindowingStrategy}.</li>
- * <li>It merges windows according to the {@link WindowingStrategy}.</li>
- * <li>It chooses how to track active windows and clear out expired windows
- * according to the {@link WindowingStrategy}, based on the allowed lateness and
- * whether windows can merge.</li>
- * <li>It decides whether to emit empty final panes according to whether the
- * {@link WindowingStrategy} requires it.<li>
- * <li>It uses discarding or accumulation mode according to the {@link WindowingStrategy}.</li>
- * </ul>
- */
- private final WindowingStrategy<Object, W> windowingStrategy;
-
- private final OutputWindowedValue<KV<K, OutputT>> outputter;
-
- private final StateInternals<K> stateInternals;
-
- private final Aggregator<Long, Long> droppedDueToClosedWindow;
-
- private final K key;
-
- private final OnMergeCallback onMergeCallback = new OnMergeCallback();
-
- /**
- * Track which windows are still active and which 'state address' windows contain state
- * for a merged window.
- *
- * <ul>
- * <li>State: Global map for all active windows for this computation and key.
- * <li>Lifetime: Cleared when no active windows need to be tracked. A window lives within
- * the active window set until its trigger is closed or the window is garbage collected.
- * </ul>
- */
- private final ActiveWindowSet<W> activeWindows;
-
- /**
- * Always a {@link SystemReduceFn}.
- *
- * <ul>
- * <li>State: A bag of accumulated values, or the intermediate result of a combiner.
- * <li>State style: RENAMED
- * <li>Merging: Concatenate or otherwise combine the state from each merged window.
- * <li>Lifetime: Cleared when a pane fires if DISCARDING_FIRED_PANES. Otherwise cleared
- * when trigger is finished or when the window is garbage collected.
- * </ul>
- */
- private final ReduceFn<K, InputT, OutputT, W> reduceFn;
-
- /**
- * Manage the setting and firing of timer events.
- *
- * <ul>
- * <li>Merging: End-of-window and garbage collection timers are cancelled when windows are
- * merged away. Timers created by triggers are never garbage collected and are left to
- * fire and be ignored.
- * <li>Lifetime: Timers automatically disappear after they fire.
- * </ul>
- */
- private final TimerInternals timerInternals;
-
- /**
- * Manage the execution and state for triggers.
- *
- * <ul>
- * <li>State: Tracks which sub-triggers have finished, and any additional state needed to
- * determine when the trigger should fire.
- * <li>State style: DIRECT
- * <li>Merging: Finished bits are explicitly managed. Other state is eagerly merged as
- * needed.
- * <li>Lifetime: Most trigger state is cleared when the final pane is emitted. However
- * the finished bits are left behind and must be cleared when the window is
- * garbage collected.
- * </ul>
- */
- private final TriggerRunner<W> triggerRunner;
-
- /**
- * Store the output watermark holds for each window.
- *
- * <ul>
- * <li>State: Bag of hold timestamps.
- * <li>State style: RENAMED
- * <li>Merging: Depending on {@link OutputTimeFn}, may need to be recalculated on merging.
- * When a pane fires it may be necessary to add (back) an end-of-window or garbage collection
- * hold.
- * <li>Lifetime: Cleared when a pane fires or when the window is garbage collected.
- * </ul>
- */
- private final WatermarkHold<W> watermarkHold;
-
- private final ReduceFnContextFactory<K, InputT, OutputT, W> contextFactory;
-
- /**
- * Store the previously emitted pane (if any) for each window.
- *
- * <ul>
- * <li>State: The previous {@link PaneInfo} passed to the user's {@link DoFn#processElement},
- * if any.
- * <li>Style style: DIRECT
- * <li>Merging: Always keyed by actual window, so does not depend on {@link #activeWindows}.
- * Cleared when window is merged away.
- * <li>Lifetime: Cleared when trigger is closed or window is garbage collected.
- * </ul>
- */
- private final PaneInfoTracker paneInfoTracker;
-
- /**
- * Store whether we've seen any elements for a window since the last pane was emitted.
- *
- * <ul>
- * <li>State: Unless DISCARDING_FIRED_PANES, a count of number of elements added so far.
- * <li>State style: RENAMED.
- * <li>Merging: Counts are summed when windows are merged.
- * <li>Lifetime: Cleared when pane fires or window is garbage collected.
- * </ul>
- */
- private final NonEmptyPanes<K, W> nonEmptyPanes;
-
- public ReduceFnRunner(
- K key,
- WindowingStrategy<?, W> windowingStrategy,
- StateInternals<K> stateInternals,
- TimerInternals timerInternals,
- WindowingInternals<?, KV<K, OutputT>> windowingInternals,
- Aggregator<Long, Long> droppedDueToClosedWindow,
- ReduceFn<K, InputT, OutputT, W> reduceFn,
- PipelineOptions options) {
- this.key = key;
- this.timerInternals = timerInternals;
- this.paneInfoTracker = new PaneInfoTracker(timerInternals);
- this.stateInternals = stateInternals;
- this.outputter = new OutputViaWindowingInternals<>(windowingInternals);
- this.droppedDueToClosedWindow = droppedDueToClosedWindow;
- this.reduceFn = reduceFn;
-
- @SuppressWarnings("unchecked")
- WindowingStrategy<Object, W> objectWindowingStrategy =
- (WindowingStrategy<Object, W>) windowingStrategy;
- this.windowingStrategy = objectWindowingStrategy;
-
- this.nonEmptyPanes = NonEmptyPanes.create(this.windowingStrategy, this.reduceFn);
-
- // Note this may incur I/O to load persisted window set data.
- this.activeWindows = createActiveWindowSet();
-
- this.contextFactory =
- new ReduceFnContextFactory<K, InputT, OutputT, W>(key, reduceFn, this.windowingStrategy,
- stateInternals, this.activeWindows, timerInternals, windowingInternals, options);
-
- this.watermarkHold = new WatermarkHold<>(timerInternals, windowingStrategy);
- this.triggerRunner =
- new TriggerRunner<>(
- windowingStrategy.getTrigger(),
- new TriggerContextFactory<>(windowingStrategy, stateInternals, activeWindows));
- }
-
- private ActiveWindowSet<W> createActiveWindowSet() {
- return windowingStrategy.getWindowFn().isNonMerging()
- ? new NonMergingActiveWindowSet<W>()
- : new MergingActiveWindowSet<W>(windowingStrategy.getWindowFn(), stateInternals);
- }
-
- @VisibleForTesting
- boolean isFinished(W window) {
- return triggerRunner.isClosed(contextFactory.base(window, StateStyle.DIRECT).state());
- }
-
- /**
- * Incorporate {@code values} into the underlying reduce function, and manage holds, timers,
- * triggers, and window merging.
- *
- * <p>The general strategy is:
- * <ol>
- * <li>Use {@link WindowedValue#getWindows} (itself determined using
- * {@link WindowFn#assignWindows}) to determine which windows each element belongs to. Some
- * of those windows will already have state associated with them. The rest are considered
- * NEW.
- * <li>Use {@link WindowFn#mergeWindows} to attempt to merge currently ACTIVE and NEW windows.
- * Each NEW window will become either ACTIVE, MERGED, or EPHEMERAL. (See {@link
- * ActiveWindowSet} for definitions of these terms.)
- * <li>If at all possible, eagerly substitute EPHEMERAL windows with their ACTIVE state address
- * windows before any state is associated with the EPHEMERAL window. In the common case that
- * windows for new elements are merged into existing ACTIVE windows then no additional
- * storage or merging overhead will be incurred.
- * <li>Otherwise, keep track of the state address windows for ACTIVE windows so that their
- * states can be merged on-demand when a pane fires.
- * <li>Process the element for each of the windows it's windows have been merged into according
- * to {@link ActiveWindowSet}. Processing may require running triggers, setting timers,
- * setting holds, and invoking {@link ReduceFn#onTrigger}.
- * </ol>
- */
- public void processElements(Iterable<WindowedValue<InputT>> values) throws Exception {
- // If an incoming element introduces a new window, attempt to merge it into an existing
- // window eagerly. The outcome is stored in the ActiveWindowSet.
- collectAndMergeWindows(values);
-
- Set<W> windowsToConsider = new HashSet<>();
-
- // Process each element, using the updated activeWindows determined by collectAndMergeWindows.
- for (WindowedValue<InputT> value : values) {
- windowsToConsider.addAll(processElement(value));
- }
-
- // Trigger output from any window for which the trigger is ready
- for (W mergedWindow : windowsToConsider) {
- ReduceFn<K, InputT, OutputT, W>.Context directContext =
- contextFactory.base(mergedWindow, StateStyle.DIRECT);
- ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
- contextFactory.base(mergedWindow, StateStyle.RENAMED);
- triggerRunner.prefetchShouldFire(mergedWindow, directContext.state());
- emitIfAppropriate(directContext, renamedContext);
- }
-
- // We're all done with merging and emitting elements so can compress the activeWindow state.
- activeWindows.removeEphemeralWindows();
- }
-
- public void persist() {
- activeWindows.persist();
- }
-
- /**
- * Extract the windows associated with the values, and invoke merge.
- */
- private void collectAndMergeWindows(Iterable<WindowedValue<InputT>> values) throws Exception {
- // No-op if no merging can take place
- if (windowingStrategy.getWindowFn().isNonMerging()) {
- return;
- }
-
- // Collect the windows from all elements (except those which are too late) and
- // make sure they are already in the active window set or are added as NEW windows.
- for (WindowedValue<?> value : values) {
- for (BoundedWindow untypedWindow : value.getWindows()) {
- @SuppressWarnings("unchecked")
- W window = (W) untypedWindow;
-
- ReduceFn<K, InputT, OutputT, W>.Context directContext =
- contextFactory.base(window, StateStyle.DIRECT);
- if (triggerRunner.isClosed(directContext.state())) {
- // This window has already been closed.
- // We will update the counter for this in the corresponding processElement call.
- continue;
- }
-
- if (activeWindows.isActive(window)) {
- Set<W> stateAddressWindows = activeWindows.readStateAddresses(window);
- if (stateAddressWindows.size() > 1) {
- // This is a legacy window who's state has not been eagerly merged.
- // Do that now.
- ReduceFn<K, InputT, OutputT, W>.OnMergeContext premergeContext =
- contextFactory.forPremerge(window);
- reduceFn.onMerge(premergeContext);
- watermarkHold.onMerge(premergeContext);
- activeWindows.merged(window);
- }
- }
-
- // Add this window as NEW if we've not yet seen it.
- activeWindows.addNew(window);
- }
- }
-
- // Merge all of the active windows and retain a mapping from source windows to result windows.
- mergeActiveWindows();
- }
-
- private class OnMergeCallback implements ActiveWindowSet.MergeCallback<W> {
- /**
- * Called from the active window set to indicate {@code toBeMerged} (of which only
- * {@code activeToBeMerged} are ACTIVE and thus have state associated with them) will later
- * be merged into {@code mergeResult}.
- */
- @Override
- public void prefetchOnMerge(
- Collection<W> toBeMerged, Collection<W> activeToBeMerged, W mergeResult) throws Exception {
- ReduceFn<K, InputT, OutputT, W>.OnMergeContext directMergeContext =
- contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.DIRECT);
- ReduceFn<K, InputT, OutputT, W>.OnMergeContext renamedMergeContext =
- contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.RENAMED);
-
- // Prefetch various state.
- triggerRunner.prefetchForMerge(mergeResult, activeToBeMerged, directMergeContext.state());
- reduceFn.prefetchOnMerge(renamedMergeContext.state());
- watermarkHold.prefetchOnMerge(renamedMergeContext.state());
- nonEmptyPanes.prefetchOnMerge(renamedMergeContext.state());
- }
-
- /**
- * Called from the active window set to indicate {@code toBeMerged} (of which only
- * {@code activeToBeMerged} are ACTIVE and thus have state associated with them) are about
- * to be merged into {@code mergeResult}.
- */
- @Override
- public void onMerge(Collection<W> toBeMerged, Collection<W> activeToBeMerged, W mergeResult)
- throws Exception {
- // At this point activeWindows has NOT incorporated the results of the merge.
- ReduceFn<K, InputT, OutputT, W>.OnMergeContext directMergeContext =
- contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.DIRECT);
- ReduceFn<K, InputT, OutputT, W>.OnMergeContext renamedMergeContext =
- contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.RENAMED);
-
- // Run the reduceFn to perform any needed merging.
- reduceFn.onMerge(renamedMergeContext);
-
- // Merge the watermark holds.
- watermarkHold.onMerge(renamedMergeContext);
-
- // Merge non-empty pane state.
- nonEmptyPanes.onMerge(renamedMergeContext.state());
-
- // Have the trigger merge state as needed
- triggerRunner.onMerge(
- directMergeContext.window(), directMergeContext.timers(), directMergeContext.state());
-
- for (W active : activeToBeMerged) {
- if (active.equals(mergeResult)) {
- // Not merged away.
- continue;
- }
- // Cleanup flavor A: Currently ACTIVE window is about to become MERGED.
- // Clear any state not already cleared by the onMerge calls above.
- WindowTracing.debug("ReduceFnRunner.onMerge: Merging {} into {}", active, mergeResult);
- ReduceFn<K, InputT, OutputT, W>.Context directClearContext =
- contextFactory.base(active, StateStyle.DIRECT);
- // No need for the end-of-window or garbage collection timers.
- // We will establish a new end-of-window or garbage collection timer for the mergeResult
- // window in processElement below. There must be at least one element for the mergeResult
- // window since a new element with a new window must have triggered this onMerge.
- cancelEndOfWindowAndGarbageCollectionTimers(directClearContext);
- // We no longer care about any previous panes of merged away windows. The
- // merge result window gets to start fresh if it is new.
- paneInfoTracker.clear(directClearContext.state());
- }
- }
- }
-
- private void mergeActiveWindows() throws Exception {
- activeWindows.merge(onMergeCallback);
- }
-
- /**
- * Process an element.
- * @param value the value being processed
- *
- * @return the set of windows in which the element was actually processed
- */
- private Collection<W> processElement(WindowedValue<InputT> value) throws Exception {
- // Redirect element windows to the ACTIVE windows they have been merged into.
- // The compressed representation (value, {window1, window2, ...}) actually represents
- // distinct elements (value, window1), (value, window2), ...
- // so if window1 and window2 merge, the resulting window will contain both copies
- // of the value.
- Collection<W> windows = new ArrayList<>();
- for (BoundedWindow untypedWindow : value.getWindows()) {
- @SuppressWarnings("unchecked")
- W window = (W) untypedWindow;
- W active = activeWindows.representative(window);
- Preconditions.checkState(active != null, "Window %s should have been added", window);
- windows.add(active);
- }
-
- // Prefetch in each of the windows if we're going to need to process triggers
- for (W window : windows) {
- ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
- window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
- triggerRunner.prefetchForValue(window, directContext.state());
- }
-
- // Process the element for each (representative) window it belongs to.
- for (W window : windows) {
- ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
- window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
- ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext = contextFactory.forValue(
- window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);
-
- // Check to see if the triggerRunner thinks the window is closed. If so, drop that window.
- if (triggerRunner.isClosed(directContext.state())) {
- droppedDueToClosedWindow.addValue(1L);
- WindowTracing.debug(
- "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
- + "since window is no longer active at inputWatermark:{}; outputWatermark:{}",
- value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
- timerInternals.currentOutputWatermarkTime());
- continue;
- }
-
- nonEmptyPanes.recordContent(renamedContext.state());
-
- // Make sure we've scheduled the end-of-window or garbage collection timer for this window.
- Instant timer = scheduleEndOfWindowOrGarbageCollectionTimer(directContext);
-
- // Hold back progress of the output watermark until we have processed the pane this
- // element will be included within. If the element is too late for that, place a hold at
- // the end-of-window or garbage collection time to allow empty panes to contribute elements
- // which won't be dropped due to lateness by a following computation (assuming the following
- // computation uses the same allowed lateness value...)
- @Nullable Instant hold = watermarkHold.addHolds(renamedContext);
-
- if (hold != null) {
- // Assert that holds have a proximate timer.
- boolean holdInWindow = !hold.isAfter(window.maxTimestamp());
- boolean timerInWindow = !timer.isAfter(window.maxTimestamp());
- Preconditions.checkState(
- holdInWindow == timerInWindow,
- "set a hold at %s, a timer at %s, which disagree as to whether they are in window %s",
- hold,
- timer,
- directContext.window());
- }
-
- // Execute the reduceFn, which will buffer the value as appropriate
- reduceFn.processValue(renamedContext);
-
- // Run the trigger to update its state
- triggerRunner.processValue(
- directContext.window(),
- directContext.timestamp(),
- directContext.timers(),
- directContext.state());
- }
-
- return windows;
- }
-
- /**
- * Called when an end-of-window, garbage collection, or trigger-specific timer fires.
- */
- public void onTimer(TimerData timer) throws Exception {
- // Which window is the timer for?
- Preconditions.checkArgument(timer.getNamespace() instanceof WindowNamespace,
- "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace());
- @SuppressWarnings("unchecked")
- WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
- W window = windowNamespace.getWindow();
- ReduceFn<K, InputT, OutputT, W>.Context directContext =
- contextFactory.base(window, StateStyle.DIRECT);
- ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
- contextFactory.base(window, StateStyle.RENAMED);
-
- // Has this window had its trigger finish?
- // - The trigger may implement isClosed as constant false.
- // - If the window function does not support windowing then all windows will be considered
- // active.
- // So we must take conjunction of activeWindows and triggerRunner state.
- boolean windowIsActive =
- activeWindows.isActive(window) && !triggerRunner.isClosed(directContext.state());
-
- if (!windowIsActive) {
- WindowTracing.debug(
- "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window);
- }
-
- // If this is a garbage collection timer then we should trigger and garbage collect the window.
- Instant cleanupTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
- boolean isGarbageCollection =
- TimeDomain.EVENT_TIME == timer.getDomain() && timer.getTimestamp().equals(cleanupTime);
-
- if (isGarbageCollection) {
- WindowTracing.debug(
- "ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with "
- + "inputWatermark:{}; outputWatermark:{}",
- key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(),
- timerInternals.currentOutputWatermarkTime());
-
- if (windowIsActive) {
- // We need to call onTrigger to emit the final pane if required.
- // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
- // and the watermark has passed the end of the window.
- onTrigger(directContext, renamedContext, true/* isFinished */);
- }
-
- // Cleanup flavor B: Clear all the remaining state for this window since we'll never
- // see elements for it again.
- clearAllState(directContext, renamedContext, windowIsActive);
- } else {
- WindowTracing.debug(
- "ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with "
- + "inputWatermark:{}; outputWatermark:{}",
- key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(),
- timerInternals.currentOutputWatermarkTime());
- if (windowIsActive) {
- emitIfAppropriate(directContext, renamedContext);
- }
-
- // If this is an end-of-window timer then, we need to set a GC timer
- boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
- && timer.getTimestamp().equals(window.maxTimestamp());
- if (isEndOfWindow) {
- // Since we are processing an on-time firing we should schedule the garbage collection
- // timer. (If getAllowedLateness is zero then the timer event will be considered a
- // cleanup event and handled by the above).
- // Note we must do this even if the trigger is finished so that we are sure to cleanup
- // any final trigger tombstones.
- Preconditions.checkState(
- windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
- "Unexpected zero getAllowedLateness");
- WindowTracing.debug(
- "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with "
- + "inputWatermark:{}; outputWatermark:{}",
- key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(),
- timerInternals.currentOutputWatermarkTime());
- directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME);
- }
- }
- }
-
- /**
- * Clear all the state associated with {@code context}'s window.
- * Should only be invoked if we know all future elements for this window will be considered
- * beyond allowed lateness.
- * This is a superset of the clearing done by {@link #emitIfAppropriate} below since:
- * <ol>
- * <li>We can clear the trigger state tombstone since we'll never need to ask about it again.
- * <li>We can clear any remaining garbage collection hold.
- * </ol>
- */
- private void clearAllState(
- ReduceFn<K, InputT, OutputT, W>.Context directContext,
- ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
- boolean windowIsActive)
- throws Exception {
- if (windowIsActive) {
- // Since both the window is in the active window set AND the trigger was not yet closed,
- // it is possible we still have state.
- reduceFn.clearState(renamedContext);
- watermarkHold.clearHolds(renamedContext);
- nonEmptyPanes.clearPane(renamedContext.state());
- triggerRunner.clearState(
- directContext.window(), directContext.timers(), directContext.state());
- } else {
- // Needed only for backwards compatibility over UPDATE.
- // Clear any end-of-window or garbage collection holds keyed by the current window.
- // Only needed if:
- // - We have merging windows.
- // - We are DISCARDING_FIRED_PANES.
- // - A pane has fired.
- // - But the trigger is not (yet) closed.
- if (windowingStrategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES
- && !windowingStrategy.getWindowFn().isNonMerging()) {
- watermarkHold.clearHolds(directContext);
- }
- }
- paneInfoTracker.clear(directContext.state());
- if (activeWindows.isActive(directContext.window())) {
- // Don't need to track address state windows anymore.
- activeWindows.remove(directContext.window());
- }
- // We'll never need to test for the trigger being closed again.
- triggerRunner.clearFinished(directContext.state());
- }
-
- /** Should the reduce function state be cleared? */
- private boolean shouldDiscardAfterFiring(boolean isFinished) {
- if (isFinished) {
- // This is the last firing for trigger.
- return true;
- }
- if (windowingStrategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES) {
- // Nothing should be accumulated between panes.
- return true;
- }
- return false;
- }
-
- /**
- * Possibly emit a pane if a trigger is ready to fire or timers require it, and cleanup state.
- */
- private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context directContext,
- ReduceFn<K, InputT, OutputT, W>.Context renamedContext)
- throws Exception {
- if (!triggerRunner.shouldFire(
- directContext.window(), directContext.timers(), directContext.state())) {
- // Ignore unless trigger is ready to fire
- return;
- }
-
- // Inform the trigger of the transition to see if it is finished
- triggerRunner.onFire(directContext.window(), directContext.timers(), directContext.state());
- boolean isFinished = triggerRunner.isClosed(directContext.state());
-
- // Will be able to clear all element state after triggering?
- boolean shouldDiscard = shouldDiscardAfterFiring(isFinished);
-
- // Run onTrigger to produce the actual pane contents.
- // As a side effect it will clear all element holds, but not necessarily any
- // end-of-window or garbage collection holds.
- onTrigger(directContext, renamedContext, isFinished);
-
- // Now that we've triggered, the pane is empty.
- nonEmptyPanes.clearPane(renamedContext.state());
-
- // Cleanup buffered data if appropriate
- if (shouldDiscard) {
- // Cleanup flavor C: The user does not want any buffered data to persist between panes.
- reduceFn.clearState(renamedContext);
- }
-
- if (isFinished) {
- // Cleanup flavor D: If trigger is closed we will ignore all new incoming elements.
- // Clear state not otherwise cleared by onTrigger and clearPane above.
- // Remember the trigger is, indeed, closed until the window is garbage collected.
- triggerRunner.clearState(
- directContext.window(), directContext.timers(), directContext.state());
- paneInfoTracker.clear(directContext.state());
- activeWindows.remove(directContext.window());
- }
- }
-
- /**
- * Do we need to emit a pane?
- */
- private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) {
- if (!isEmpty) {
- // The pane has elements.
- return true;
- }
- if (timing == Timing.ON_TIME) {
- // This is the unique ON_TIME pane.
- return true;
- }
- if (isFinished && windowingStrategy.getClosingBehavior() == ClosingBehavior.FIRE_ALWAYS) {
- // This is known to be the final pane, and the user has requested it even when empty.
- return true;
- }
- return false;
- }
-
- /**
- * Run the {@link ReduceFn#onTrigger} method and produce any necessary output.
- */
- private void onTrigger(
- final ReduceFn<K, InputT, OutputT, W>.Context directContext,
- ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
- boolean isFinished)
- throws Exception {
- // Prefetch necessary states
- ReadableState<Instant> outputTimestampFuture =
- watermarkHold.extractAndRelease(renamedContext, isFinished).readLater();
- ReadableState<PaneInfo> paneFuture =
- paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater();
- ReadableState<Boolean> isEmptyFuture =
- nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
-
- reduceFn.prefetchOnTrigger(directContext.state());
- triggerRunner.prefetchOnFire(directContext.window(), directContext.state());
-
- // Calculate the pane info.
- final PaneInfo pane = paneFuture.read();
- // Extract the window hold, and as a side effect clear it.
- final Instant outputTimestamp = outputTimestampFuture.read();
-
- // Only emit a pane if it has data or empty panes are observable.
- if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) {
- // Run reduceFn.onTrigger method.
- final List<W> windows = Collections.singletonList(directContext.window());
- ReduceFn<K, InputT, OutputT, W>.OnTriggerContext renamedTriggerContext =
- contextFactory.forTrigger(directContext.window(), paneFuture, StateStyle.RENAMED,
- new OnTriggerCallbacks<OutputT>() {
- @Override
- public void output(OutputT toOutput) {
- // We're going to output panes, so commit the (now used) PaneInfo.
- // TODO: This is unnecessary if the trigger isFinished since the saved
- // state will be immediately deleted.
- paneInfoTracker.storeCurrentPaneInfo(directContext, pane);
-
- // Output the actual value.
- outputter.outputWindowedValue(
- KV.of(key, toOutput), outputTimestamp, windows, pane);
- }
- });
-
- reduceFn.onTrigger(renamedTriggerContext);
- }
- }
-
- /**
- * Make sure we'll eventually have a timer fire which will tell us to garbage collect
- * the window state. For efficiency we may need to do this in two steps rather
- * than one. Return the time at which the timer will fire.
- *
- * <ul>
- * <li>If allowedLateness is zero then we'll garbage collect at the end of the window.
- * For simplicity we'll set our own timer for this situation even though an
- * {@link AfterWatermark} trigger may have also set an end-of-window timer.
- * ({@code setTimer} is idempotent.)
- * <li>If allowedLateness is non-zero then we could just always set a timer for the garbage
- * collection time. However if the windows are large (eg hourly) and the allowedLateness is small
- * (eg seconds) then we'll end up with nearly twice the number of timers in-flight. So we
- * instead set an end-of-window timer and then roll that forward to a garbage collection timer
- * when it fires. We use the input watermark to distinguish those cases.
- * </ul>
- */
- private Instant scheduleEndOfWindowOrGarbageCollectionTimer(
- ReduceFn<?, ?, ?, W>.Context directContext) {
- Instant inputWM = timerInternals.currentInputWatermarkTime();
- Instant endOfWindow = directContext.window().maxTimestamp();
- Instant fireTime;
- String which;
- if (inputWM != null && endOfWindow.isBefore(inputWM)) {
- fireTime = endOfWindow.plus(windowingStrategy.getAllowedLateness());
- which = "garbage collection";
- } else {
- fireTime = endOfWindow;
- which = "end-of-window";
- }
- WindowTracing.trace(
- "ReduceFnRunner.scheduleEndOfWindowOrGarbageCollectionTimer: Scheduling {} timer at {} for "
- + "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}",
- which,
- fireTime,
- key,
- directContext.window(),
- inputWM,
- timerInternals.currentOutputWatermarkTime());
- directContext.timers().setTimer(fireTime, TimeDomain.EVENT_TIME);
- return fireTime;
- }
-
- private void cancelEndOfWindowAndGarbageCollectionTimers(ReduceFn<?, ?, ?, W>.Context context) {
- WindowTracing.debug(
- "ReduceFnRunner.cancelEndOfWindowAndGarbageCollectionTimers: Deleting timers for "
- + "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}",
- key, context.window(), timerInternals.currentInputWatermarkTime(),
- timerInternals.currentOutputWatermarkTime());
- Instant timer = context.window().maxTimestamp();
- context.timers().deleteTimer(timer, TimeDomain.EVENT_TIME);
- if (windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) {
- timer = timer.plus(windowingStrategy.getAllowedLateness());
- context.timers().deleteTimer(timer, TimeDomain.EVENT_TIME);
- }
- }
-
- /**
- * An object that can output a value with all of its windowing information. This is a deliberately
- * restricted subinterface of {@link WindowingInternals} to express how it is used here.
- */
- private interface OutputWindowedValue<OutputT> {
- void outputWindowedValue(OutputT output, Instant timestamp,
- Collection<? extends BoundedWindow> windows, PaneInfo pane);
- }
-
- private static class OutputViaWindowingInternals<OutputT>
- implements OutputWindowedValue<OutputT> {
-
- private final WindowingInternals<?, OutputT> windowingInternals;
-
- public OutputViaWindowingInternals(WindowingInternals<?, OutputT> windowingInternals) {
- this.windowingInternals = windowingInternals;
- }
-
- @Override
- public void outputWindowedValue(
- OutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- windowingInternals.outputWindowedValue(output, timestamp, windows, pane);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReifyTimestampAndWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReifyTimestampAndWindowsDoFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReifyTimestampAndWindowsDoFn.java
deleted file mode 100644
index 88a1c15..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReifyTimestampAndWindowsDoFn.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.values.KV;
-
-/**
- * DoFn that makes timestamps and window assignments explicit in the value part of each key/value
- * pair.
- *
- * @param <K> the type of the keys of the input and output {@code PCollection}s
- * @param <V> the type of the values of the input {@code PCollection}
- */
-@SystemDoFnInternal
-public class ReifyTimestampAndWindowsDoFn<K, V>
- extends DoFn<KV<K, V>, KV<K, WindowedValue<V>>> {
- @Override
- public void processElement(ProcessContext c)
- throws Exception {
- KV<K, V> kv = c.element();
- K key = kv.getKey();
- V value = kv.getValue();
- c.output(KV.of(
- key,
- WindowedValue.of(
- value,
- c.timestamp(),
- c.windowingInternals().windows(),
- c.pane())));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Reshuffle.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Reshuffle.java
deleted file mode 100644
index 367db2d..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Reshuffle.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.NonMergingWindowFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.util.Collection;
-
-/**
- * A {@link PTransform} that returns a {@link PCollection} equivalent to its input but operationally
- * provides some of the side effects of a {@link GroupByKey}, in particular preventing fusion of
- * the surrounding transforms, checkpointing and deduplication by id (see
- * {@link ValueWithRecordId}).
- *
- * <p>Performs a {@link GroupByKey} so that the data is key-partitioned. Configures the
- * {@link WindowingStrategy} so that no data is dropped, but doesn't affect the need for
- * the user to specify allowed lateness and accumulation mode before a user-inserted GroupByKey.
- *
- * @param <K> The type of key being reshuffled on.
- * @param <V> The type of value being reshuffled.
- */
-public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
-
- private Reshuffle() {
- }
-
- public static <K, V> Reshuffle<K, V> of() {
- return new Reshuffle<K, V>();
- }
-
- @Override
- public PCollection<KV<K, V>> apply(PCollection<KV<K, V>> input) {
- WindowingStrategy<?, ?> originalStrategy = input.getWindowingStrategy();
- // If the input has already had its windows merged, then the GBK that performed the merge
- // will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained
- // here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged.
- Window.Bound<KV<K, V>> rewindow = Window
- .<KV<K, V>>into(new PassThroughWindowFn<>(originalStrategy.getWindowFn()))
- .triggering(new ReshuffleTrigger<>())
- .discardingFiredPanes()
- .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
-
- return input.apply(rewindow)
- .apply(GroupByKey.<K, V>create())
- // Set the windowing strategy directly, so that it doesn't get counted as the user having
- // set allowed lateness.
- .setWindowingStrategyInternal(originalStrategy)
- .apply(ParDo.named("ExpandIterable").of(
- new DoFn<KV<K, Iterable<V>>, KV<K, V>>() {
- @Override
- public void processElement(ProcessContext c) {
- K key = c.element().getKey();
- for (V value : c.element().getValue()) {
- c.output(KV.of(key, value));
- }
- }
- }));
- }
-
- /**
- * A {@link WindowFn} that leaves all associations between elements and windows unchanged.
- *
- * <p>In order to implement all the abstract methods of {@link WindowFn}, this requires the
- * prior {@link WindowFn}, to which all auxiliary functionality is delegated.
- */
- private static class PassThroughWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow> {
-
- /** The WindowFn prior to this. Used for its windowCoder, etc. */
- private final WindowFn<?, BoundedWindow> priorWindowFn;
-
- public PassThroughWindowFn(WindowFn<?, ?> priorWindowFn) {
- // Safe because it is only used privately here.
- // At every point where a window is returned or accepted, it has been provided
- // by priorWindowFn, so it is of the type expected.
- @SuppressWarnings("unchecked")
- WindowFn<?, BoundedWindow> internalWindowFn = (WindowFn<?, BoundedWindow>) priorWindowFn;
- this.priorWindowFn = internalWindowFn;
- }
-
- @Override
- public Collection<BoundedWindow> assignWindows(WindowFn<T, BoundedWindow>.AssignContext c)
- throws Exception {
- // The windows are provided by priorWindowFn, which also provides the coder for them
- @SuppressWarnings("unchecked")
- Collection<BoundedWindow> priorWindows = (Collection<BoundedWindow>) c.windows();
- return priorWindows;
- }
-
- @Override
- public boolean isCompatible(WindowFn<?, ?> other) {
- throw new UnsupportedOperationException(
- String.format("%s.isCompatible() should never be called."
- + " It is a private implementation detail of Reshuffle."
- + " This message indicates a bug in the Dataflow SDK.",
- getClass().getCanonicalName()));
- }
-
- @Override
- public Coder<BoundedWindow> windowCoder() {
- // Safe because priorWindowFn provides the windows also.
- // The Coder is _not_ actually a coder for an arbitrary BoundedWindow.
- return priorWindowFn.windowCoder();
- }
-
- @Override
- public BoundedWindow getSideInputWindow(BoundedWindow window) {
- throw new UnsupportedOperationException(
- String.format("%s.getSideInputWindow() should never be called."
- + " It is a private implementation detail of Reshuffle."
- + " This message indicates a bug in the Dataflow SDK.",
- getClass().getCanonicalName()));
- }
-
- @Override
- public Instant getOutputTime(Instant inputTimestamp, BoundedWindow window) {
- return inputTimestamp;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReshuffleTrigger.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReshuffleTrigger.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReshuffleTrigger.java
deleted file mode 100644
index 248f005..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReshuffleTrigger.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger;
-
-import org.joda.time.Instant;
-
-import java.util.List;
-
-/**
- * The trigger used with {@link Reshuffle} which triggers on every element
- * and never buffers state.
- *
- * @param <W> The kind of window that is being reshuffled.
- */
-public class ReshuffleTrigger<W extends BoundedWindow> extends Trigger<W> {
-
- ReshuffleTrigger() {
- super(null);
- }
-
- @Override
- public void onElement(Trigger<W>.OnElementContext c) { }
-
- @Override
- public void onMerge(Trigger<W>.OnMergeContext c) { }
-
- @Override
- protected Trigger<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers) {
- return this;
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(W window) {
- throw new UnsupportedOperationException(
- "ReshuffleTrigger should not be used outside of Reshuffle");
- }
-
- @Override
- public boolean shouldFire(Trigger<W>.TriggerContext context) throws Exception {
- return true;
- }
-
- @Override
- public void onFire(Trigger<W>.TriggerContext context) throws Exception { }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/RetryHttpRequestInitializer.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/RetryHttpRequestInitializer.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/RetryHttpRequestInitializer.java
deleted file mode 100644
index 756dce0..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/RetryHttpRequestInitializer.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.http.HttpBackOffIOExceptionHandler;
-import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
-import com.google.api.client.http.HttpRequest;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.http.HttpResponse;
-import com.google.api.client.http.HttpResponseInterceptor;
-import com.google.api.client.http.HttpUnsuccessfulResponseHandler;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.ExponentialBackOff;
-import com.google.api.client.util.NanoClock;
-import com.google.api.client.util.Sleeper;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-/**
- * Implements a request initializer that adds retry handlers to all
- * HttpRequests.
- *
- * <p>This allows chaining through to another HttpRequestInitializer, since
- * clients have exactly one HttpRequestInitializer, and Credential is also
- * a required HttpRequestInitializer.
- *
- * <p>Also can take a HttpResponseInterceptor to be applied to the responses.
- */
-public class RetryHttpRequestInitializer implements HttpRequestInitializer {
-
- private static final Logger LOG = LoggerFactory.getLogger(RetryHttpRequestInitializer.class);
-
- /**
- * Http response codes that should be silently ignored.
- */
- private static final Set<Integer> DEFAULT_IGNORED_RESPONSE_CODES = new HashSet<>(
- Arrays.asList(307 /* Redirect, handled by the client library */,
- 308 /* Resume Incomplete, handled by the client library */));
-
- /**
- * Http response timeout to use for hanging gets.
- */
- private static final int HANGING_GET_TIMEOUT_SEC = 80;
-
- private static class LoggingHttpBackOffIOExceptionHandler
- extends HttpBackOffIOExceptionHandler {
- public LoggingHttpBackOffIOExceptionHandler(BackOff backOff) {
- super(backOff);
- }
-
- @Override
- public boolean handleIOException(HttpRequest request, boolean supportsRetry)
- throws IOException {
- boolean willRetry = super.handleIOException(request, supportsRetry);
- if (willRetry) {
- LOG.debug("Request failed with IOException, will retry: {}", request.getUrl());
- } else {
- LOG.warn("Request failed with IOException, will NOT retry: {}", request.getUrl());
- }
- return willRetry;
- }
- }
-
- private static class LoggingHttpBackoffUnsuccessfulResponseHandler
- implements HttpUnsuccessfulResponseHandler {
- private final HttpBackOffUnsuccessfulResponseHandler handler;
- private final Set<Integer> ignoredResponseCodes;
-
- public LoggingHttpBackoffUnsuccessfulResponseHandler(BackOff backoff,
- Sleeper sleeper, Set<Integer> ignoredResponseCodes) {
- this.ignoredResponseCodes = ignoredResponseCodes;
- handler = new HttpBackOffUnsuccessfulResponseHandler(backoff);
- handler.setSleeper(sleeper);
- handler.setBackOffRequired(
- new HttpBackOffUnsuccessfulResponseHandler.BackOffRequired() {
- @Override
- public boolean isRequired(HttpResponse response) {
- int statusCode = response.getStatusCode();
- return (statusCode / 100 == 5) || // 5xx: server error
- statusCode == 429; // 429: Too many requests
- }
- });
- }
-
- @Override
- public boolean handleResponse(HttpRequest request, HttpResponse response,
- boolean supportsRetry) throws IOException {
- boolean retry = handler.handleResponse(request, response, supportsRetry);
- if (retry) {
- LOG.debug("Request failed with code {} will retry: {}",
- response.getStatusCode(), request.getUrl());
-
- } else if (!ignoredResponseCodes.contains(response.getStatusCode())) {
- LOG.warn("Request failed with code {}, will NOT retry: {}",
- response.getStatusCode(), request.getUrl());
- }
-
- return retry;
- }
- }
-
- @Deprecated
- private final HttpRequestInitializer chained;
-
- private final HttpResponseInterceptor responseInterceptor; // response Interceptor to use
-
- private final NanoClock nanoClock; // used for testing
-
- private final Sleeper sleeper; // used for testing
-
- private Set<Integer> ignoredResponseCodes = new HashSet<>(DEFAULT_IGNORED_RESPONSE_CODES);
-
- public RetryHttpRequestInitializer() {
- this(Collections.<Integer>emptyList());
- }
-
- /**
- * @param chained a downstream HttpRequestInitializer, which will also be
- * applied to HttpRequest initialization. May be null.
- *
- * @deprecated use {@link #RetryHttpRequestInitializer}.
- */
- @Deprecated
- public RetryHttpRequestInitializer(@Nullable HttpRequestInitializer chained) {
- this(chained, Collections.<Integer>emptyList());
- }
-
- /**
- * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged.
- */
- public RetryHttpRequestInitializer(Collection<Integer> additionalIgnoredResponseCodes) {
- this(additionalIgnoredResponseCodes, null);
- }
-
-
- /**
- * @param chained a downstream HttpRequestInitializer, which will also be
- * applied to HttpRequest initialization. May be null.
- * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged.
- *
- * @deprecated use {@link #RetryHttpRequestInitializer(Collection)}.
- */
- @Deprecated
- public RetryHttpRequestInitializer(@Nullable HttpRequestInitializer chained,
- Collection<Integer> additionalIgnoredResponseCodes) {
- this(chained, additionalIgnoredResponseCodes, null);
- }
-
- /**
- * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged.
- * @param responseInterceptor HttpResponseInterceptor to be applied on all requests. May be null.
- */
- public RetryHttpRequestInitializer(
- Collection<Integer> additionalIgnoredResponseCodes,
- @Nullable HttpResponseInterceptor responseInterceptor) {
- this(null, NanoClock.SYSTEM, Sleeper.DEFAULT, additionalIgnoredResponseCodes,
- responseInterceptor);
- }
-
- /**
- * @param chained a downstream HttpRequestInitializer, which will also be applied to HttpRequest
- * initialization. May be null.
- * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged.
- * @param responseInterceptor HttpResponseInterceptor to be applied on all requests. May be null.
- *
- * @deprecated use {@link #RetryHttpRequestInitializer(Collection, HttpResponseInterceptor)}.
- */
- @Deprecated
- public RetryHttpRequestInitializer(
- @Nullable HttpRequestInitializer chained,
- Collection<Integer> additionalIgnoredResponseCodes,
- @Nullable HttpResponseInterceptor responseInterceptor) {
- this(chained, NanoClock.SYSTEM, Sleeper.DEFAULT, additionalIgnoredResponseCodes,
- responseInterceptor);
- }
-
- /**
- * Visible for testing.
- *
- * @param chained a downstream HttpRequestInitializer, which will also be
- * applied to HttpRequest initialization. May be null.
- * @param nanoClock used as a timing source for knowing how much time has elapsed.
- * @param sleeper used to sleep between retries.
- * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged.
- */
- RetryHttpRequestInitializer(@Nullable HttpRequestInitializer chained,
- NanoClock nanoClock, Sleeper sleeper, Collection<Integer> additionalIgnoredResponseCodes,
- HttpResponseInterceptor responseInterceptor) {
- this.chained = chained;
- this.nanoClock = nanoClock;
- this.sleeper = sleeper;
- this.ignoredResponseCodes.addAll(additionalIgnoredResponseCodes);
- this.responseInterceptor = responseInterceptor;
- }
-
- @Override
- public void initialize(HttpRequest request) throws IOException {
- if (chained != null) {
- chained.initialize(request);
- }
-
- // Set a timeout for hanging-gets.
- // TODO: Do this exclusively for work requests.
- request.setReadTimeout(HANGING_GET_TIMEOUT_SEC * 1000);
-
- // Back off on retryable http errors.
- request.setUnsuccessfulResponseHandler(
- // A back-off multiplier of 2 raises the maximum request retrying time
- // to approximately 5 minutes (keeping other back-off parameters to
- // their default values).
- new LoggingHttpBackoffUnsuccessfulResponseHandler(
- new ExponentialBackOff.Builder().setNanoClock(nanoClock)
- .setMultiplier(2).build(),
- sleeper, ignoredResponseCodes));
-
- // Retry immediately on IOExceptions.
- LoggingHttpBackOffIOExceptionHandler loggingBackoffHandler =
- new LoggingHttpBackOffIOExceptionHandler(BackOff.ZERO_BACKOFF);
- request.setIOExceptionHandler(loggingBackoffHandler);
-
- // Set response initializer
- if (responseInterceptor != null) {
- request.setResponseInterceptor(responseInterceptor);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SerializableUtils.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SerializableUtils.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SerializableUtils.java
deleted file mode 100644
index 501b430..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SerializableUtils.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray;
-import static com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.common.base.Preconditions;
-
-import org.xerial.snappy.SnappyInputStream;
-import org.xerial.snappy.SnappyOutputStream;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.util.Arrays;
-
-/**
- * Utilities for working with Serializables.
- */
-public class SerializableUtils {
- /**
- * Serializes the argument into an array of bytes, and returns it.
- *
- * @throws IllegalArgumentException if there are errors when serializing
- */
- public static byte[] serializeToByteArray(Serializable value) {
- try {
- ByteArrayOutputStream buffer = new ByteArrayOutputStream();
- try (ObjectOutputStream oos = new ObjectOutputStream(new SnappyOutputStream(buffer))) {
- oos.writeObject(value);
- }
- return buffer.toByteArray();
- } catch (IOException exn) {
- throw new IllegalArgumentException(
- "unable to serialize " + value,
- exn);
- }
- }
-
- /**
- * Deserializes an object from the given array of bytes, e.g., as
- * serialized using {@link #serializeToByteArray}, and returns it.
- *
- * @throws IllegalArgumentException if there are errors when
- * deserializing, using the provided description to identify what
- * was being deserialized
- */
- public static Object deserializeFromByteArray(byte[] encodedValue,
- String description) {
- try {
- try (ObjectInputStream ois = new ObjectInputStream(
- new SnappyInputStream(new ByteArrayInputStream(encodedValue)))) {
- return ois.readObject();
- }
- } catch (IOException | ClassNotFoundException exn) {
- throw new IllegalArgumentException(
- "unable to deserialize " + description,
- exn);
- }
- }
-
- public static <T extends Serializable> T ensureSerializable(T value) {
- @SuppressWarnings("unchecked")
- T copy = (T) deserializeFromByteArray(serializeToByteArray(value),
- value.toString());
- return copy;
- }
-
- public static <T extends Serializable> T clone(T value) {
- @SuppressWarnings("unchecked")
- T copy = (T) deserializeFromByteArray(serializeToByteArray(value),
- value.toString());
- return copy;
- }
-
- /**
- * Serializes a Coder and verifies that it can be correctly deserialized.
- *
- * <p>Throws a RuntimeException if serialized Coder cannot be deserialized, or
- * if the deserialized instance is not equal to the original.
- *
- * @return the serialized Coder, as a {@link CloudObject}
- */
- public static CloudObject ensureSerializable(Coder<?> coder) {
- // Make sure that Coders are java serializable as well since
- // they are regularly captured within DoFn's.
- Coder<?> copy = (Coder<?>) ensureSerializable((Serializable) coder);
-
- CloudObject cloudObject = copy.asCloudObject();
-
- Coder<?> decoded;
- try {
- decoded = Serializer.deserialize(cloudObject, Coder.class);
- } catch (RuntimeException e) {
- throw new RuntimeException(
- String.format("Unable to deserialize Coder: %s. "
- + "Check that a suitable constructor is defined. "
- + "See Coder for details.", coder), e
- );
- }
- Preconditions.checkState(coder.equals(decoded),
- String.format("Coder not equal to original after serialization, "
- + "indicating that the Coder may not implement serialization "
- + "correctly. Before: %s, after: %s, cloud encoding: %s",
- coder, decoded, cloudObject));
-
- return cloudObject;
- }
-
- /**
- * Serializes an arbitrary T with the given {@code Coder<T>} and verifies
- * that it can be correctly deserialized.
- */
- public static <T> T ensureSerializableByCoder(
- Coder<T> coder, T value, String errorContext) {
- byte[] encodedValue;
- try {
- encodedValue = encodeToByteArray(coder, value);
- } catch (CoderException exn) {
- // TODO: Put in better element printing:
- // truncate if too long.
- throw new IllegalArgumentException(
- errorContext + ": unable to encode value "
- + value + " using " + coder,
- exn);
- }
- try {
- return decodeFromByteArray(coder, encodedValue);
- } catch (CoderException exn) {
- // TODO: Put in better encoded byte array printing:
- // use printable chars with escapes instead of codes, and
- // truncate if too long.
- throw new IllegalArgumentException(
- errorContext + ": unable to decode " + Arrays.toString(encodedValue)
- + ", encoding of value " + value + ", using " + coder,
- exn);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Serializer.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Serializer.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Serializer.java
deleted file mode 100644
index 6a8a337..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Serializer.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * Utility for converting objects between Java and Cloud representations.
- */
-public final class Serializer {
- // Delay initialization of statics until the first call to Serializer.
- private static class SingletonHelper {
- static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
- static final ObjectMapper TREE_MAPPER = createTreeMapper();
-
- /**
- * Creates the object mapper that will be used for serializing Google API
- * client maps into Jackson trees.
- */
- private static ObjectMapper createTreeMapper() {
- return new ObjectMapper();
- }
-
- /**
- * Creates the object mapper that will be used for deserializing Jackson
- * trees into objects.
- */
- private static ObjectMapper createObjectMapper() {
- ObjectMapper m = new ObjectMapper();
- // Ignore properties that are not used by the object.
- m.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
-
- // For parameters of type Object, use the @type property to determine the
- // class to instantiate.
- //
- // TODO: It would be ideal to do this for all non-final classes. The
- // problem with using DefaultTyping.NON_FINAL is that it insists on having
- // type information in the JSON for classes with useful default
- // implementations, such as List. Ideally, we'd combine these defaults
- // with available type information if that information's present.
- m.enableDefaultTypingAsProperty(
- ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT,
- PropertyNames.OBJECT_TYPE_NAME);
-
- m.registerModule(new CoderUtils.Jackson2Module());
-
- return m;
- }
- }
-
- /**
- * Deserializes an object from a Dataflow structured encoding (represented in
- * Java as a map).
- *
- * <p>The standard Dataflow SDK object serialization protocol is based on JSON.
- * Data is typically encoded as a JSON object whose fields represent the
- * object's data.
- *
- * <p>The actual deserialization is performed by Jackson, which can deserialize
- * public fields, use JavaBean setters, or use injection annotations to
- * indicate how to construct the object. The {@link ObjectMapper} used is
- * configured to use the "@type" field as the name of the class to instantiate
- * (supporting polymorphic types), and may be further configured by
- * annotations or via {@link ObjectMapper#registerModule}.
- *
- * @see <a href="http://wiki.fasterxml.com/JacksonFAQ#Data_Binding.2C_general">
- * Jackson Data-Binding</a>
- * @see <a href="https://github.com/FasterXML/jackson-annotations/wiki/Jackson-Annotations">
- * Jackson-Annotations</a>
- * @param serialized the object in untyped decoded form (i.e. a nested {@link Map})
- * @param clazz the expected object class
- */
- public static <T> T deserialize(Map<String, Object> serialized, Class<T> clazz) {
- try {
- return SingletonHelper.OBJECT_MAPPER.treeToValue(
- SingletonHelper.TREE_MAPPER.valueToTree(
- deserializeCloudKnownTypes(serialized)),
- clazz);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(
- "Unable to deserialize class " + clazz, e);
- }
- }
-
- /**
- * Recursively walks the supplied map, looking for well-known cloud type
- * information (keyed as {@link PropertyNames#OBJECT_TYPE_NAME}, matching a
- * URI value from the {@link CloudKnownType} enum. Upon finding this type
- * information, it converts it into the correspondingly typed Java value.
- */
- @SuppressWarnings("unchecked")
- private static Object deserializeCloudKnownTypes(Object src) {
- if (src instanceof Map) {
- Map<String, Object> srcMap = (Map<String, Object>) src;
- @Nullable Object value = srcMap.get(PropertyNames.SCALAR_FIELD_NAME);
- @Nullable CloudKnownType type =
- CloudKnownType.forUri((String) srcMap.get(PropertyNames.OBJECT_TYPE_NAME));
- if (type != null && value != null) {
- // It's a value of a well-known cloud type; let the known type handler
- // handle the translation.
- Object result = type.parse(value, type.defaultClass());
- return result;
- }
- // Otherwise, it's just an ordinary map.
- Map<String, Object> dest = new HashMap<>(srcMap.size());
- for (Map.Entry<String, Object> entry : srcMap.entrySet()) {
- dest.put(entry.getKey(), deserializeCloudKnownTypes(entry.getValue()));
- }
- return dest;
- }
- if (src instanceof List) {
- List<Object> srcList = (List<Object>) src;
- List<Object> dest = new ArrayList<>(srcList.size());
- for (Object obj : srcList) {
- dest.add(deserializeCloudKnownTypes(obj));
- }
- return dest;
- }
- // Neither a Map nor a List; no translation needed.
- return src;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ShardingWritableByteChannel.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ShardingWritableByteChannel.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ShardingWritableByteChannel.java
deleted file mode 100644
index 54794ef..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ShardingWritableByteChannel.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
-import java.util.ArrayList;
-
-/**
- * Implements a WritableByteChannel that may contain multiple output shards.
- *
- * <p>This provides {@link #writeToShard}, which takes a shard number for
- * writing to a particular shard.
- *
- * <p>The channel is considered open if all downstream channels are open, and
- * closes all downstream channels when closed.
- */
-public class ShardingWritableByteChannel implements WritableByteChannel {
-
- /**
- * Special shard number that causes a write to all shards.
- */
- public static final int ALL_SHARDS = -2;
-
-
- private final ArrayList<WritableByteChannel> writers = new ArrayList<>();
-
- /**
- * Returns the number of output shards.
- */
- public int getNumShards() {
- return writers.size();
- }
-
- /**
- * Adds another shard output channel.
- */
- public void addChannel(WritableByteChannel writer) {
- writers.add(writer);
- }
-
- /**
- * Returns the WritableByteChannel associated with the given shard number.
- */
- public WritableByteChannel getChannel(int shardNum) {
- return writers.get(shardNum);
- }
-
- /**
- * Writes the buffer to the given shard.
- *
- * <p>This does not change the current output shard.
- *
- * @return The total number of bytes written. If the shard number is
- * {@link #ALL_SHARDS}, then the total is the sum of each individual shard
- * write.
- */
- public int writeToShard(int shardNum, ByteBuffer src) throws IOException {
- if (shardNum >= 0) {
- return writers.get(shardNum).write(src);
- }
-
- switch (shardNum) {
- case ALL_SHARDS:
- int size = 0;
- for (WritableByteChannel writer : writers) {
- size += writer.write(src);
- }
- return size;
-
- default:
- throw new IllegalArgumentException("Illegal shard number: " + shardNum);
- }
- }
-
- /**
- * Writes a buffer to all shards.
- *
- * <p>Same as calling {@code writeToShard(ALL_SHARDS, buf)}.
- */
- @Override
- public int write(ByteBuffer src) throws IOException {
- return writeToShard(ALL_SHARDS, src);
- }
-
- @Override
- public boolean isOpen() {
- for (WritableByteChannel writer : writers) {
- if (!writer.isOpen()) {
- return false;
- }
- }
-
- return true;
- }
-
- @Override
- public void close() throws IOException {
- for (WritableByteChannel writer : writers) {
- writer.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SideInputReader.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SideInputReader.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SideInputReader.java
deleted file mode 100644
index 37873f3..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SideInputReader.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-
-import javax.annotation.Nullable;
-
-/**
- * The interface to objects that provide side inputs. Particular implementations
- * may read a side input directly or use appropriate sorts of caching, etc.
- */
-public interface SideInputReader {
- /**
- * Returns the value of the given {@link PCollectionView} for the given {@link BoundedWindow}.
- *
- * <p>It is valid for a side input to be {@code null}. It is <i>not</i> valid for this to
- * return {@code null} for any other reason.
- */
- @Nullable
- <T> T get(PCollectionView<T> view, BoundedWindow window);
-
- /**
- * Returns true if the given {@link PCollectionView} is valid for this reader.
- */
- <T> boolean contains(PCollectionView<T> view);
-
- /**
- * Returns true if there are no side inputs in this reader.
- */
- boolean isEmpty();
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SimpleDoFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SimpleDoFnRunner.java
deleted file mode 100644
index 15a5e51..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SimpleDoFnRunner.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.util.DoFnRunners.OutputManager;
-import com.google.cloud.dataflow.sdk.util.ExecutionContext.StepContext;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet.AddCounterMutator;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-
-import java.util.List;
-
-/**
- * Runs a {@link DoFn} by constructing the appropriate contexts and passing them in.
- *
- * @param <InputT> the type of the DoFn's (main) input elements
- * @param <OutputT> the type of the DoFn's (main) output elements
- */
-public class SimpleDoFnRunner<InputT, OutputT> extends DoFnRunnerBase<InputT, OutputT>{
-
- protected SimpleDoFnRunner(PipelineOptions options, DoFn<InputT, OutputT> fn,
- SideInputReader sideInputReader,
- OutputManager outputManager,
- TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, StepContext stepContext,
- AddCounterMutator addCounterMutator, WindowingStrategy<?, ?> windowingStrategy) {
- super(options, fn, sideInputReader, outputManager, mainOutputTag, sideOutputTags, stepContext,
- addCounterMutator, windowingStrategy);
- }
-
- @Override
- protected void invokeProcessElement(WindowedValue<InputT> elem) {
- final DoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem);
- // This can contain user code. Wrap it in case it throws an exception.
- try {
- fn.processElement(processContext);
- } catch (Exception ex) {
- throw wrapUserCodeException(ex);
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Stager.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Stager.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Stager.java
deleted file mode 100644
index 04fd599..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Stager.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.services.dataflow.model.DataflowPackage;
-
-import java.util.List;
-
-/**
- * Interface for staging files needed for running a Dataflow pipeline.
- */
-public interface Stager {
- /* Stage files and return a list of packages. */
- public List<DataflowPackage> stageFiles();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/StreamUtils.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/StreamUtils.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/StreamUtils.java
deleted file mode 100644
index 268eb7f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/StreamUtils.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.ref.SoftReference;
-
-/**
- * Utility functions for stream operations.
- */
-public class StreamUtils {
-
- private StreamUtils() {
- }
-
- private static final int BUF_SIZE = 8192;
-
- private static ThreadLocal<SoftReference<byte[]>> threadLocalBuffer = new ThreadLocal<>();
-
- /**
- * Efficient converting stream to bytes.
- */
- public static byte[] getBytes(InputStream stream) throws IOException {
- if (stream instanceof ExposedByteArrayInputStream) {
- // Fast path for the exposed version.
- return ((ExposedByteArrayInputStream) stream).readAll();
- } else if (stream instanceof ByteArrayInputStream) {
- // Fast path for ByteArrayInputStream.
- byte[] ret = new byte[stream.available()];
- stream.read(ret);
- return ret;
- }
- // Falls back to normal stream copying.
- SoftReference<byte[]> refBuffer = threadLocalBuffer.get();
- byte[] buffer = refBuffer == null ? null : refBuffer.get();
- if (buffer == null) {
- buffer = new byte[BUF_SIZE];
- threadLocalBuffer.set(new SoftReference<byte[]>(buffer));
- }
- ByteArrayOutputStream outStream = new ByteArrayOutputStream();
- while (true) {
- int r = stream.read(buffer);
- if (r == -1) {
- break;
- }
- outStream.write(buffer, 0, r);
- }
- return outStream.toByteArray();
- }
-
-}