You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/02 17:43:49 UTC
[02/11] incubator-beam git commit: Put classes in runners-core
package into runners.core namespace
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
deleted file mode 100644
index 24e33dd..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
+++ /dev/null
@@ -1,784 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import com.google.common.base.Function;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.PriorityQueue;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.InMemoryStateInternals;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Test utility that runs a {@link ReduceFn}, {@link WindowFn}, {@link Trigger} using in-memory stub
- * implementations to provide the {@link TimerInternals} and {@link WindowingInternals} needed to
- * run {@code Trigger}s and {@code ReduceFn}s.
- *
- * @param <InputT> The element types.
- * @param <OutputT> The final type for elements in the window (for instance,
- * {@code Iterable<InputT>})
- * @param <W> The type of windows being used.
- */
-public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
- private static final String KEY = "TEST_KEY";
-
- private final TestInMemoryStateInternals<String> stateInternals =
- new TestInMemoryStateInternals<>(KEY);
- private final TestTimerInternals timerInternals = new TestTimerInternals();
-
- private final WindowFn<Object, W> windowFn;
- private final TestWindowingInternals windowingInternals;
- private final Coder<OutputT> outputCoder;
- private final WindowingStrategy<Object, W> objectStrategy;
- private final ReduceFn<String, InputT, OutputT, W> reduceFn;
- private final PipelineOptions options;
-
- /**
- * If true, the output watermark is automatically advanced to the latest possible
- * point when the input watermark is advanced. This is the default for most tests.
- * If false, the output watermark must be explicitly advanced by the test, which can
- * be used to exercise some of the more subtle behavior of WatermarkHold.
- */
- private boolean autoAdvanceOutputWatermark;
-
- private ExecutableTrigger executableTrigger;
-
- private final InMemoryLongSumAggregator droppedDueToClosedWindow =
- new InMemoryLongSumAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
-
- public static <W extends BoundedWindow> ReduceFnTester<Integer, Iterable<Integer>, W>
- nonCombining(WindowingStrategy<?, W> windowingStrategy) throws Exception {
- return new ReduceFnTester<Integer, Iterable<Integer>, W>(
- windowingStrategy,
- SystemReduceFn.<String, Integer, W>buffering(VarIntCoder.of()),
- IterableCoder.of(VarIntCoder.of()),
- PipelineOptionsFactory.create(),
- NullSideInputReader.empty());
- }
-
- public static <W extends BoundedWindow> ReduceFnTester<Integer, Iterable<Integer>, W>
- nonCombining(WindowFn<?, W> windowFn, Trigger trigger, AccumulationMode mode,
- Duration allowedDataLateness, ClosingBehavior closingBehavior) throws Exception {
- WindowingStrategy<?, W> strategy =
- WindowingStrategy.of(windowFn)
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
- .withTrigger(trigger)
- .withMode(mode)
- .withAllowedLateness(allowedDataLateness)
- .withClosingBehavior(closingBehavior);
- return nonCombining(strategy);
- }
-
- public static <W extends BoundedWindow, AccumT, OutputT> ReduceFnTester<Integer, OutputT, W>
- combining(WindowingStrategy<?, W> strategy,
- KeyedCombineFn<String, Integer, AccumT, OutputT> combineFn,
- Coder<OutputT> outputCoder) throws Exception {
-
- CoderRegistry registry = new CoderRegistry();
- registry.registerStandardCoders();
- AppliedCombineFn<String, Integer, AccumT, OutputT> fn =
- AppliedCombineFn.<String, Integer, AccumT, OutputT>withInputCoder(
- combineFn, registry, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
-
- return new ReduceFnTester<Integer, OutputT, W>(
- strategy,
- SystemReduceFn.<String, Integer, AccumT, OutputT, W>combining(StringUtf8Coder.of(), fn),
- outputCoder,
- PipelineOptionsFactory.create(),
- NullSideInputReader.empty());
- }
-
- public static <W extends BoundedWindow, AccumT, OutputT> ReduceFnTester<Integer, OutputT, W>
- combining(WindowingStrategy<?, W> strategy,
- KeyedCombineFnWithContext<String, Integer, AccumT, OutputT> combineFn,
- Coder<OutputT> outputCoder,
- PipelineOptions options,
- SideInputReader sideInputReader) throws Exception {
- CoderRegistry registry = new CoderRegistry();
- registry.registerStandardCoders();
- AppliedCombineFn<String, Integer, AccumT, OutputT> fn =
- AppliedCombineFn.<String, Integer, AccumT, OutputT>withInputCoder(
- combineFn, registry, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
-
- return new ReduceFnTester<Integer, OutputT, W>(
- strategy,
- SystemReduceFn.<String, Integer, AccumT, OutputT, W>combining(StringUtf8Coder.of(), fn),
- outputCoder,
- options,
- sideInputReader);
- }
- public static <W extends BoundedWindow, AccumT, OutputT> ReduceFnTester<Integer, OutputT, W>
- combining(WindowFn<?, W> windowFn, Trigger trigger, AccumulationMode mode,
- KeyedCombineFn<String, Integer, AccumT, OutputT> combineFn, Coder<OutputT> outputCoder,
- Duration allowedDataLateness) throws Exception {
-
- WindowingStrategy<?, W> strategy =
- WindowingStrategy.of(windowFn)
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
- .withTrigger(trigger)
- .withMode(mode)
- .withAllowedLateness(allowedDataLateness);
-
- return combining(strategy, combineFn, outputCoder);
- }
-
- private ReduceFnTester(WindowingStrategy<?, W> wildcardStrategy,
- ReduceFn<String, InputT, OutputT, W> reduceFn, Coder<OutputT> outputCoder,
- PipelineOptions options, SideInputReader sideInputReader) throws Exception {
- @SuppressWarnings("unchecked")
- WindowingStrategy<Object, W> objectStrategy = (WindowingStrategy<Object, W>) wildcardStrategy;
-
- this.objectStrategy = objectStrategy;
- this.reduceFn = reduceFn;
- this.windowFn = objectStrategy.getWindowFn();
- this.windowingInternals = new TestWindowingInternals(sideInputReader);
- this.outputCoder = outputCoder;
- this.autoAdvanceOutputWatermark = true;
- this.executableTrigger = wildcardStrategy.getTrigger();
- this.options = options;
- }
-
- public void setAutoAdvanceOutputWatermark(boolean autoAdvanceOutputWatermark) {
- this.autoAdvanceOutputWatermark = autoAdvanceOutputWatermark;
- }
-
- @Nullable
- public Instant getNextTimer(TimeDomain domain) {
- return timerInternals.getNextTimer(domain);
- }
-
- ReduceFnRunner<String, InputT, OutputT, W> createRunner() {
- return new ReduceFnRunner<>(
- KEY,
- objectStrategy,
- stateInternals,
- timerInternals,
- windowingInternals,
- droppedDueToClosedWindow,
- reduceFn,
- options);
- }
-
- public ExecutableTrigger getTrigger() {
- return executableTrigger;
- }
-
- public boolean isMarkedFinished(W window) {
- return createRunner().isFinished(window);
- }
-
- public boolean hasNoActiveWindows() {
- return createRunner().hasNoActiveWindows();
- }
-
- @SafeVarargs
- public final void assertHasOnlyGlobalAndFinishedSetsFor(W... expectedWindows) {
- assertHasOnlyGlobalAndAllowedTags(
- ImmutableSet.copyOf(expectedWindows),
- ImmutableSet.<StateTag<? super String, ?>>of(TriggerRunner.FINISHED_BITS_TAG));
- }
-
- @SafeVarargs
- public final void assertHasOnlyGlobalAndFinishedSetsAndPaneInfoFor(W... expectedWindows) {
- assertHasOnlyGlobalAndAllowedTags(
- ImmutableSet.copyOf(expectedWindows),
- ImmutableSet.<StateTag<? super String, ?>>of(
- TriggerRunner.FINISHED_BITS_TAG, PaneInfoTracker.PANE_INFO_TAG,
- WatermarkHold.watermarkHoldTagForOutputTimeFn(objectStrategy.getOutputTimeFn()),
- WatermarkHold.EXTRA_HOLD_TAG));
- }
-
- public final void assertHasOnlyGlobalState() {
- assertHasOnlyGlobalAndAllowedTags(
- Collections.<W>emptySet(), Collections.<StateTag<? super String, ?>>emptySet());
- }
-
- @SafeVarargs
- public final void assertHasOnlyGlobalAndPaneInfoFor(W... expectedWindows) {
- assertHasOnlyGlobalAndAllowedTags(
- ImmutableSet.copyOf(expectedWindows),
- ImmutableSet.<StateTag<? super String, ?>>of(
- PaneInfoTracker.PANE_INFO_TAG,
- WatermarkHold.watermarkHoldTagForOutputTimeFn(objectStrategy.getOutputTimeFn()),
- WatermarkHold.EXTRA_HOLD_TAG));
- }
-
- /**
- * Verifies that the the set of windows that have any state stored is exactly
- * {@code expectedWindows} and that each of these windows has only tags from {@code allowedTags}.
- */
- private void assertHasOnlyGlobalAndAllowedTags(
- Set<W> expectedWindows, Set<StateTag<? super String, ?>> allowedTags) {
- Set<StateNamespace> expectedWindowsSet = new HashSet<>();
- for (W expectedWindow : expectedWindows) {
- expectedWindowsSet.add(windowNamespace(expectedWindow));
- }
- Map<StateNamespace, Set<StateTag<? super String, ?>>> actualWindows = new HashMap<>();
-
- for (StateNamespace namespace : stateInternals.getNamespacesInUse()) {
- if (namespace instanceof StateNamespaces.GlobalNamespace) {
- continue;
- } else if (namespace instanceof StateNamespaces.WindowNamespace) {
- Set<StateTag<? super String, ?>> tagsInUse = stateInternals.getTagsInUse(namespace);
- if (tagsInUse.isEmpty()) {
- continue;
- }
- actualWindows.put(namespace, tagsInUse);
- Set<StateTag<? super String, ?>> unexpected = Sets.difference(tagsInUse, allowedTags);
- if (unexpected.isEmpty()) {
- continue;
- } else {
- fail(namespace + " has unexpected states: " + tagsInUse);
- }
- } else if (namespace instanceof StateNamespaces.WindowAndTriggerNamespace) {
- Set<StateTag<? super String, ?>> tagsInUse = stateInternals.getTagsInUse(namespace);
- assertTrue(namespace + " contains " + tagsInUse, tagsInUse.isEmpty());
- } else {
- fail("Unrecognized namespace " + namespace);
- }
- }
-
- assertEquals("Still in use: " + actualWindows.toString(), expectedWindowsSet,
- actualWindows.keySet());
- }
-
- private StateNamespace windowNamespace(W window) {
- return StateNamespaces.window(windowFn.windowCoder(), window);
- }
-
- public Instant getWatermarkHold() {
- return stateInternals.earliestWatermarkHold();
- }
-
- public Instant getOutputWatermark() {
- return timerInternals.currentOutputWatermarkTime();
- }
-
- public long getElementsDroppedDueToClosedWindow() {
- return droppedDueToClosedWindow.getSum();
- }
-
- /**
- * How many panes do we have in the output?
- */
- public int getOutputSize() {
- return windowingInternals.outputs.size();
- }
-
- /**
- * Retrieve the values that have been output to this time, and clear out the output accumulator.
- */
- public List<WindowedValue<OutputT>> extractOutput() {
- ImmutableList<WindowedValue<OutputT>> result =
- FluentIterable.from(windowingInternals.outputs)
- .transform(new Function<WindowedValue<KV<String, OutputT>>, WindowedValue<OutputT>>() {
- @Override
- public WindowedValue<OutputT> apply(WindowedValue<KV<String, OutputT>> input) {
- return input.withValue(input.getValue().getValue());
- }
- })
- .toList();
- windowingInternals.outputs.clear();
- return result;
- }
-
- /**
- * Advance the input watermark to the specified time, firing any timers that should
- * fire. Then advance the output watermark as far as possible.
- */
- public void advanceInputWatermark(Instant newInputWatermark) throws Exception {
- ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
- timerInternals.advanceInputWatermark(runner, newInputWatermark);
- runner.persist();
- }
-
- /**
- * If {@link #autoAdvanceOutputWatermark} is {@literal false}, advance the output watermark
- * to the given value. Otherwise throw.
- */
- public void advanceOutputWatermark(Instant newOutputWatermark) throws Exception {
- timerInternals.advanceOutputWatermark(newOutputWatermark);
- }
-
- /** Advance the processing time to the specified time, firing any timers that should fire. */
- public void advanceProcessingTime(Instant newProcessingTime) throws Exception {
- ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
- timerInternals.advanceProcessingTime(runner, newProcessingTime);
- runner.persist();
- }
-
- /**
- * Advance the synchronized processing time to the specified time,
- * firing any timers that should fire.
- */
- public void advanceSynchronizedProcessingTime(Instant newProcessingTime) throws Exception {
- ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
- timerInternals.advanceSynchronizedProcessingTime(runner, newProcessingTime);
- runner.persist();
- }
-
- /**
- * Inject all the timestamped values (after passing through the window function) as if they
- * arrived in a single chunk of a bundle (or work-unit).
- */
- @SafeVarargs
- public final void injectElements(TimestampedValue<InputT>... values) throws Exception {
- for (TimestampedValue<InputT> value : values) {
- WindowTracing.trace("TriggerTester.injectElements: {}", value);
- }
- ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
- runner.processElements(
- Iterables.transform(
- Arrays.asList(values),
- new Function<TimestampedValue<InputT>, WindowedValue<InputT>>() {
- @Override
- public WindowedValue<InputT> apply(TimestampedValue<InputT> input) {
- try {
- InputT value = input.getValue();
- Instant timestamp = input.getTimestamp();
- Collection<W> windows =
- windowFn.assignWindows(
- new TestAssignContext<W>(
- windowFn, value, timestamp, GlobalWindow.INSTANCE));
- return WindowedValue.of(value, timestamp, windows, PaneInfo.NO_FIRING);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }));
-
- // Persist after each bundle.
- runner.persist();
- }
-
- public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exception {
- ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
- runner.onTimer(
- TimerData.of(StateNamespaces.window(windowFn.windowCoder(), window), timestamp, domain));
- runner.persist();
- }
-
- /**
- * Simulate state.
- */
- private static class TestInMemoryStateInternals<K> extends InMemoryStateInternals<K> {
-
- public TestInMemoryStateInternals(K key) {
- super(key);
- }
-
- public Set<StateTag<? super K, ?>> getTagsInUse(StateNamespace namespace) {
- Set<StateTag<? super K, ?>> inUse = new HashSet<>();
- for (Entry<StateTag<? super K, ?>, State> entry :
- inMemoryState.getTagsInUse(namespace).entrySet()) {
- if (!isEmptyForTesting(entry.getValue())) {
- inUse.add(entry.getKey());
- }
- }
- return inUse;
- }
-
- public Set<StateNamespace> getNamespacesInUse() {
- return inMemoryState.getNamespacesInUse();
- }
-
- /** Return the earliest output watermark hold in state, or null if none. */
- public Instant earliestWatermarkHold() {
- Instant minimum = null;
- for (State storage : inMemoryState.values()) {
- if (storage instanceof WatermarkHoldState) {
- Instant hold = ((WatermarkHoldState<?>) storage).read();
- if (minimum == null || (hold != null && hold.isBefore(minimum))) {
- minimum = hold;
- }
- }
- }
- return minimum;
- }
- }
-
- /**
- * Convey the simulated state and implement {@link #outputWindowedValue} to capture all output
- * elements.
- */
- private class TestWindowingInternals implements WindowingInternals<InputT, KV<String, OutputT>> {
- private List<WindowedValue<KV<String, OutputT>>> outputs = new ArrayList<>();
- private SideInputReader sideInputReader;
-
- private TestWindowingInternals(SideInputReader sideInputReader) {
- this.sideInputReader = sideInputReader;
- }
-
- @Override
- public void outputWindowedValue(KV<String, OutputT> output, Instant timestamp,
- Collection<? extends BoundedWindow> windows, PaneInfo pane) {
- // Copy the output value (using coders) before capturing it.
- KV<String, OutputT> copy = SerializableUtils.<KV<String, OutputT>>ensureSerializableByCoder(
- KvCoder.of(StringUtf8Coder.of(), outputCoder), output, "outputForWindow");
- WindowedValue<KV<String, OutputT>> value = WindowedValue.of(copy, timestamp, windows, pane);
- outputs.add(value);
- }
-
- @Override
- public TimerInternals timerInternals() {
- throw new UnsupportedOperationException(
- "Testing triggers should not use timers from WindowingInternals.");
- }
-
- @Override
- public Collection<? extends BoundedWindow> windows() {
- throw new UnsupportedOperationException(
- "Testing triggers should not use windows from WindowingInternals.");
- }
-
- @Override
- public PaneInfo pane() {
- throw new UnsupportedOperationException(
- "Testing triggers should not use pane from WindowingInternals.");
- }
-
- @Override
- public <T> void writePCollectionViewData(
- TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
- throw new UnsupportedOperationException(
- "Testing triggers should not use writePCollectionViewData from WindowingInternals.");
- }
-
- @Override
- public StateInternals<Object> stateInternals() {
- // Safe for testing only
- @SuppressWarnings({"unchecked", "rawtypes"})
- TestInMemoryStateInternals<Object> untypedStateInternals =
- (TestInMemoryStateInternals) stateInternals;
- return untypedStateInternals;
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
- if (!sideInputReader.contains(view)) {
- throw new IllegalArgumentException("calling sideInput() with unknown view");
- }
- BoundedWindow sideInputWindow =
- view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
- return sideInputReader.get(view, sideInputWindow);
- }
- }
-
- private static class TestAssignContext<W extends BoundedWindow>
- extends WindowFn<Object, W>.AssignContext {
- private Object element;
- private Instant timestamp;
- private BoundedWindow window;
-
- public TestAssignContext(
- WindowFn<Object, W> windowFn, Object element, Instant timestamp, BoundedWindow window) {
- windowFn.super();
- this.element = element;
- this.timestamp = timestamp;
- this.window = window;
- }
-
- @Override
- public Object element() {
- return element;
- }
-
- @Override
- public Instant timestamp() {
- return timestamp;
- }
-
- @Override
- public BoundedWindow window() {
- return window;
- }
- }
-
- private static class InMemoryLongSumAggregator implements Aggregator<Long, Long> {
- private final String name;
- private long sum = 0;
-
- public InMemoryLongSumAggregator(String name) {
- this.name = name;
- }
-
- @Override
- public void addValue(Long value) {
- sum += value;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public CombineFn<Long, ?, Long> getCombineFn() {
- return new Sum.SumLongFn();
- }
-
- public long getSum() {
- return sum;
- }
- }
-
- /**
- * Simulate the firing of timers and progression of input and output watermarks for a
- * single computation and key in a Windmill-like streaming environment. Similar to
- * {@link BatchTimerInternals}, but also tracks the output watermark.
- */
- private class TestTimerInternals implements TimerInternals {
- /** At most one timer per timestamp is kept. */
- private Set<TimerData> existingTimers = new HashSet<>();
-
- /** Pending input watermark timers, in timestamp order. */
- private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
-
- /** Pending processing time timers, in timestamp order. */
- private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11);
-
- /** Current input watermark. */
- @Nullable
- private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
- /** Current output watermark. */
- @Nullable
- private Instant outputWatermarkTime = null;
-
- /** Current processing time. */
- private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
- /** Current synchronized processing time. */
- @Nullable
- private Instant synchronizedProcessingTime = null;
-
- @Nullable
- public Instant getNextTimer(TimeDomain domain) {
- TimerData data = null;
- switch (domain) {
- case EVENT_TIME:
- data = watermarkTimers.peek();
- break;
- case PROCESSING_TIME:
- case SYNCHRONIZED_PROCESSING_TIME:
- data = processingTimers.peek();
- break;
- }
- checkNotNull(data); // cases exhaustive
- return data == null ? null : data.getTimestamp();
- }
-
- private PriorityQueue<TimerData> queue(TimeDomain domain) {
- switch (domain) {
- case EVENT_TIME:
- return watermarkTimers;
- case PROCESSING_TIME:
- case SYNCHRONIZED_PROCESSING_TIME:
- return processingTimers;
- }
- throw new RuntimeException(); // cases exhaustive
- }
-
- @Override
- public void setTimer(TimerData timer) {
- WindowTracing.trace("TestTimerInternals.setTimer: {}", timer);
- if (existingTimers.add(timer)) {
- queue(timer.getDomain()).add(timer);
- }
- }
-
- @Override
- public void deleteTimer(TimerData timer) {
- WindowTracing.trace("TestTimerInternals.deleteTimer: {}", timer);
- existingTimers.remove(timer);
- queue(timer.getDomain()).remove(timer);
- }
-
- @Override
- public Instant currentProcessingTime() {
- return processingTime;
- }
-
- @Override
- @Nullable
- public Instant currentSynchronizedProcessingTime() {
- return synchronizedProcessingTime;
- }
-
- @Override
- public Instant currentInputWatermarkTime() {
- return checkNotNull(inputWatermarkTime);
- }
-
- @Override
- @Nullable
- public Instant currentOutputWatermarkTime() {
- return outputWatermarkTime;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("watermarkTimers", watermarkTimers)
- .add("processingTimers", processingTimers)
- .add("inputWatermarkTime", inputWatermarkTime)
- .add("outputWatermarkTime", outputWatermarkTime)
- .add("processingTime", processingTime)
- .toString();
- }
-
- public void advanceInputWatermark(
- ReduceFnRunner<?, ?, ?, ?> runner, Instant newInputWatermark) throws Exception {
- checkNotNull(newInputWatermark);
- checkState(
- !newInputWatermark.isBefore(inputWatermarkTime),
- "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime,
- newInputWatermark);
- WindowTracing.trace("TestTimerInternals.advanceInputWatermark: from {} to {}",
- inputWatermarkTime, newInputWatermark);
- inputWatermarkTime = newInputWatermark;
- advanceAndFire(runner, newInputWatermark, TimeDomain.EVENT_TIME);
-
- Instant hold = stateInternals.earliestWatermarkHold();
- if (hold == null) {
- WindowTracing.trace("TestTimerInternals.advanceInputWatermark: no holds, "
- + "so output watermark = input watermark");
- hold = inputWatermarkTime;
- }
- if (autoAdvanceOutputWatermark) {
- advanceOutputWatermark(hold);
- }
- }
-
- public void advanceOutputWatermark(Instant newOutputWatermark) {
- checkNotNull(newOutputWatermark);
- if (newOutputWatermark.isAfter(inputWatermarkTime)) {
- WindowTracing.trace(
- "TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}",
- newOutputWatermark, inputWatermarkTime);
- newOutputWatermark = inputWatermarkTime;
- }
- checkState(
- outputWatermarkTime == null || !newOutputWatermark.isBefore(outputWatermarkTime),
- "Cannot move output watermark time backwards from %s to %s", outputWatermarkTime,
- newOutputWatermark);
- WindowTracing.trace("TestTimerInternals.advanceOutputWatermark: from {} to {}",
- outputWatermarkTime, newOutputWatermark);
- outputWatermarkTime = newOutputWatermark;
- }
-
- public void advanceProcessingTime(
- ReduceFnRunner<?, ?, ?, ?> runner, Instant newProcessingTime) throws Exception {
- checkState(!newProcessingTime.isBefore(processingTime),
- "Cannot move processing time backwards from %s to %s", processingTime, newProcessingTime);
- WindowTracing.trace("TestTimerInternals.advanceProcessingTime: from {} to {}", processingTime,
- newProcessingTime);
- processingTime = newProcessingTime;
- advanceAndFire(runner, newProcessingTime, TimeDomain.PROCESSING_TIME);
- }
-
- public void advanceSynchronizedProcessingTime(
- ReduceFnRunner<?, ?, ?, ?> runner, Instant newSynchronizedProcessingTime) throws Exception {
- checkState(!newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime),
- "Cannot move processing time backwards from %s to %s", processingTime,
- newSynchronizedProcessingTime);
- WindowTracing.trace("TestTimerInternals.advanceProcessingTime: from {} to {}",
- synchronizedProcessingTime, newSynchronizedProcessingTime);
- synchronizedProcessingTime = newSynchronizedProcessingTime;
- advanceAndFire(
- runner, newSynchronizedProcessingTime, TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
- }
-
- private void advanceAndFire(
- ReduceFnRunner<?, ?, ?, ?> runner, Instant currentTime, TimeDomain domain)
- throws Exception {
- PriorityQueue<TimerData> queue = queue(domain);
- boolean shouldFire = false;
-
- do {
- TimerData timer = queue.peek();
- // Timers fire when the current time progresses past the timer time.
- shouldFire = timer != null && currentTime.isAfter(timer.getTimestamp());
- if (shouldFire) {
- WindowTracing.trace(
- "TestTimerInternals.advanceAndFire: firing {} at {}", timer, currentTime);
- // Remove before firing, so that if the trigger adds another identical
- // timer we don't remove it.
- queue.remove();
-
- runner.onTimer(timer);
- }
- } while (shouldFire);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java
deleted file mode 100644
index 156b4a9..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.is;
-import static org.mockito.Mockito.mock;
-
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.util.BaseExecutionContext.StepContext;
-import org.apache.beam.sdk.values.TupleTag;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for base {@link DoFnRunnerBase} functionality.
- */
-@RunWith(JUnit4.class)
-public class SimpleDoFnRunnerTest {
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Test
- public void testExceptionsWrappedAsUserCodeException() {
- ThrowingDoFn fn = new ThrowingDoFn();
- DoFnRunner<String, String> runner = createRunner(fn);
-
- thrown.expect(UserCodeException.class);
- thrown.expectCause(is(fn.exceptionToThrow));
-
- runner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
- }
-
- @Test
- public void testSystemDoFnInternalExceptionsNotWrapped() {
- ThrowingSystemDoFn fn = new ThrowingSystemDoFn();
- DoFnRunner<String, String> runner = createRunner(fn);
-
- thrown.expect(is(fn.exceptionToThrow));
-
- runner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
- }
-
- private DoFnRunner<String, String> createRunner(OldDoFn<String, String> fn) {
- // Pass in only necessary parameters for the test
- List<TupleTag<?>> sideOutputTags = Arrays.asList();
- StepContext context = mock(StepContext.class);
- return new SimpleDoFnRunner<>(
- null, fn, null, null, null, sideOutputTags, context, null, null);
- }
-
- static class ThrowingDoFn extends OldDoFn<String, String> {
- final Exception exceptionToThrow =
- new UnsupportedOperationException("Expected exception");
-
- @Override
- public void processElement(ProcessContext c) throws Exception {
- throw exceptionToThrow;
- }
- }
-
- @SystemDoFnInternal
- static class ThrowingSystemDoFn extends ThrowingDoFn {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 2da70bb..c08c229 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -20,6 +20,10 @@ package org.apache.beam.runners.direct;
import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
+import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
+import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
+import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
@@ -28,11 +32,7 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.SystemReduceFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.StateInternals;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
index f085a39..17dc0be 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
@@ -25,6 +25,8 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
+import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
@@ -34,8 +36,6 @@ import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.KeyedWorkItems;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 85a1c6a..99ab22a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -23,15 +23,15 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.util.DoFnRunner;
-import org.apache.beam.sdk.util.DoFnRunners;
-import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
-import org.apache.beam.sdk.util.PushbackSideInputDoFnRunner;
import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java
index 41f7e8d..d40dc11 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java
@@ -18,9 +18,9 @@
package org.apache.beam.runners.direct;
import java.util.Map;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 07e2191..3719fa8 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -63,7 +63,7 @@ import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.Reshuffle;
-import org.apache.beam.sdk.util.SystemReduceFn;
+import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 2c7ebc6..3b0fccc 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -40,11 +40,11 @@ import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.DoFnRunner;
-import org.apache.beam.sdk.util.DoFnRunners;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.sdk.util.ExecutionContext;
import org.apache.beam.sdk.util.NullSideInputReader;
-import org.apache.beam.sdk.util.PushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 01cfa5b..b893116 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -43,7 +43,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.ExecutionContext;
import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.KeyedWorkItems;
-import org.apache.beam.sdk.util.SystemReduceFn;
+import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 9f1a839..03db811 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.spark;
+import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.translation.SparkPipelineEvaluator;
@@ -36,7 +37,6 @@ import org.apache.beam.sdk.runners.TransformTreeNode;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index eaceb85..8341c6d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -37,6 +37,11 @@ import java.util.Map;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
+import org.apache.beam.runners.core.AssignWindowsDoFn;
+import org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn;
+import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
+import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
+import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
import org.apache.beam.runners.spark.io.hadoop.ShardNameTemplateHelper;
@@ -61,11 +66,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.AssignWindowsDoFn;
-import org.apache.beam.sdk.util.GroupAlsoByWindowsViaOutputBufferDoFn;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
-import org.apache.beam.sdk.util.SystemReduceFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.util.WindowingStrategy;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 43dcef6..c55be3d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import kafka.serializer.Decoder;
+import org.apache.beam.runners.core.AssignWindowsDoFn;
import org.apache.beam.runners.spark.io.ConsoleIO;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.runners.spark.io.KafkaIO;
@@ -49,7 +50,6 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.AssignWindowsDoFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionList;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index cd50408..3c01690 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -110,7 +110,7 @@
<!--[BEAM-419] Non-transient non-serializable instance field in serializable class-->
</Match>
<Match>
- <Class name="org.apache.beam.sdk.util.WatermarkHold"/>
+ <Class name="org.apache.beam.runners.core.WatermarkHold"/>
<Field name="timerInternals"/>
<Bug pattern="SE_BAD_FIELD"/>
<!--[BEAM-420] Non-transient non-serializable instance field in serializable class-->
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
index fde90af..72524bd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.coders.CoderException;
/**
* Coder for the BitSet used to track child-trigger finished states.
*/
-class BitSetCoder extends AtomicCoder<BitSet> {
+public class BitSetCoder extends AtomicCoder<BitSet> {
private static final BitSetCoder INSTANCE = new BitSetCoder();
private static final ByteArrayCoder byteArrayCoder = ByteArrayCoder.of();