You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/22 02:22:19 UTC
[05/12] incubator-beam git commit: Move some easy stuff into
runners/core-java
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
deleted file mode 100644
index 65fc52d..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
-import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-/**
- * An implementation of {@link GroupByKey} built on top of a lower-level {@link GroupByKeyOnly}
- * primitive.
- *
- * <p>This implementation of {@link GroupByKey} proceeds via the following steps:
- * <ol>
- * <li>{@code ReifyTimestampsAndWindowsDoFn ParDo(ReifyTimestampsAndWindows)}: This embeds
- * the previously-implicit timestamp and window into the elements themselves, so a
- * window-and-timestamp-unaware transform can operate on them.</li>
- * <li>{@code GroupByKeyOnly}: This lower-level primitive groups by keys, ignoring windows
- * and timestamps. Many window-unaware runners have such a primitive already.</li>
- * <li>{@code SortValuesByTimestamp ParDo(SortValuesByTimestamp)}: The values in the iterables
- * output by {@link GroupByKeyOnly} are sorted by timestamp.</li>
- * <li>{@code GroupAlsoByWindow}: This primitive processes the sorted values. Today it is
- * implemented as a {@link ParDo} that calls reserved internal methods.</li>
- * </ol>
- *
- * <p>This implementation of {@link GroupByKey} has severe limitations unless its component
- * transforms are replaced. As-is, it is only applicable for in-memory runners using a batch-style
- * execution strategy. Specifically:
- *
- * <ul>
- * <li>Every iterable output by {@link GroupByKeyOnly} must contain all elements for that key.
- * A streaming-style partition, with multiple elements for the same key, will not yield
- * correct results.</li>
- * <li>Sorting of values by timestamp is performed on an in-memory list. It will not succeed
- * for large iterables.</li>
- * <li>The implementation of {@code GroupAlsoByWindow} does not support timers. This is only
- * appropriate for runners which also do not support timers.</li>
- * </ul>
- */
-public class GroupByKeyViaGroupByKeyOnly<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
-
- private GroupByKey<K, V> gbkTransform;
-
- public GroupByKeyViaGroupByKeyOnly(GroupByKey<K, V> originalTransform) {
- this.gbkTransform = originalTransform;
- }
-
- @Override
- public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
- WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-
- return input
- // Make each input element's timestamp and assigned windows
- // explicit, in the value part.
- .apply(new ReifyTimestampsAndWindows<K, V>())
-
- // Group by just the key.
- // Combiner lifting will not happen regardless of the disallowCombinerLifting value.
- // There will be no combiners right after the GroupByKeyOnly because of the two ParDos
- // introduced in here.
- .apply(new GroupByKeyOnly<K, WindowedValue<V>>())
-
- // Sort each key's values by timestamp. GroupAlsoByWindow requires
- // its input to be sorted by timestamp.
- .apply(new SortValuesByTimestamp<K, V>())
-
- // Group each key's values by window, merging windows as needed.
- .apply(new GroupAlsoByWindow<K, V>(windowingStrategy))
-
- // And update the windowing strategy as appropriate.
- .setWindowingStrategyInternal(
- gbkTransform.updateWindowingStrategy(windowingStrategy));
- }
-
- /**
- * Runner-specific primitive that groups by key only, ignoring any window assignments. A
- * runner that uses {@link GroupByKeyViaGroupByKeyOnly} should have a primitive way to translate
- * or evaluate this class.
- */
- public static class GroupByKeyOnly<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- @Override
- public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
- return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
- }
-
- @Override
- public Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
- return GroupByKey.getOutputKvCoder(input.getCoder());
- }
- }
-
- /**
- * Helper transform that makes timestamps and window assignments explicit in the value part of
- * each key/value pair.
- */
- public static class ReifyTimestampsAndWindows<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, WindowedValue<V>>>> {
-
- @Override
- public PCollection<KV<K, WindowedValue<V>>> apply(PCollection<KV<K, V>> input) {
-
- // The requirement to use a KvCoder *is* actually a model-level requirement, not specific
- // to this implementation of GBK. All runners need a way to get the key.
- checkArgument(
- input.getCoder() instanceof KvCoder,
- "%s requires its input to use a %s",
- GroupByKey.class.getSimpleName(),
- KvCoder.class.getSimpleName());
-
- @SuppressWarnings("unchecked")
- KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder();
- Coder<K> keyCoder = inputKvCoder.getKeyCoder();
- Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
- Coder<WindowedValue<V>> outputValueCoder =
- FullWindowedValueCoder.of(
- inputValueCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
- Coder<KV<K, WindowedValue<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
- return input
- .apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>()))
- .setCoder(outputKvCoder);
- }
- }
-
- /**
- * Helper transform that sorts the values associated with each key by timestamp.
- */
- private static class SortValuesByTimestamp<K, V>
- extends PTransform<
- PCollection<KV<K, Iterable<WindowedValue<V>>>>,
- PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
- @Override
- public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(
- PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
- return input
- .apply(
- ParDo.of(
- new DoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<WindowedValue<V>>>>() {
- @Override
- public void processElement(ProcessContext c) {
- KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
- K key = kvs.getKey();
- Iterable<WindowedValue<V>> unsortedValues = kvs.getValue();
- List<WindowedValue<V>> sortedValues = new ArrayList<>();
- for (WindowedValue<V> value : unsortedValues) {
- sortedValues.add(value);
- }
- Collections.sort(
- sortedValues,
- new Comparator<WindowedValue<V>>() {
- @Override
- public int compare(WindowedValue<V> e1, WindowedValue<V> e2) {
- return e1.getTimestamp().compareTo(e2.getTimestamp());
- }
- });
- c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues));
- }
- }))
- .setCoder(input.getCoder());
- }
- }
-
- /**
- * Helper transform that takes a collection of timestamp-ordered
- * values associated with each key, groups the values by window,
- * combines windows as needed, and for each window in each key,
- * outputs a collection of key/value-list pairs implicitly assigned
- * to the window and with the timestamp derived from that window.
- */
- private static class GroupAlsoByWindow<K, V>
- extends PTransform<
- PCollection<KV<K, Iterable<WindowedValue<V>>>>, PCollection<KV<K, Iterable<V>>>> {
- private final WindowingStrategy<?, ?> windowingStrategy;
-
- public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
- this.windowingStrategy = windowingStrategy;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public PCollection<KV<K, Iterable<V>>> apply(
- PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
- @SuppressWarnings("unchecked")
- KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
- (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder();
-
- Coder<K> keyCoder = inputKvCoder.getKeyCoder();
- Coder<Iterable<WindowedValue<V>>> inputValueCoder = inputKvCoder.getValueCoder();
-
- IterableCoder<WindowedValue<V>> inputIterableValueCoder =
- (IterableCoder<WindowedValue<V>>) inputValueCoder;
- Coder<WindowedValue<V>> inputIterableElementCoder = inputIterableValueCoder.getElemCoder();
- WindowedValueCoder<V> inputIterableWindowedValueCoder =
- (WindowedValueCoder<V>) inputIterableElementCoder;
-
- Coder<V> inputIterableElementValueCoder = inputIterableWindowedValueCoder.getValueCoder();
- Coder<Iterable<V>> outputValueCoder = IterableCoder.of(inputIterableElementValueCoder);
- Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
-
- return input
- .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, inputIterableElementValueCoder)))
- .setCoder(outputKvCoder);
- }
-
- private <W extends BoundedWindow>
- GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W> groupAlsoByWindowsFn(
- WindowingStrategy<?, W> strategy, Coder<V> inputIterableElementValueCoder) {
- return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
- strategy, SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java
deleted file mode 100644
index 4815162..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.KV;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-
-import org.joda.time.Instant;
-
-/**
- * A customized {@link DoFnRunner} that handles late data dropping for
- * a {@link KeyedWorkItem} input {@link DoFn}.
- *
- * <p>It expands windows before checking data lateness.
- *
- * <p>{@link KeyedWorkItem KeyedWorkItems} are always in empty windows.
- *
- * @param <K> key type
- * @param <InputT> input value element type
- * @param <OutputT> output value element type
- * @param <W> window type
- */
-public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWindow>
- implements DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> {
- private final DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner;
- private final LateDataFilter lateDataFilter;
-
- public LateDataDroppingDoFnRunner(
- DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner,
- WindowingStrategy<?, ?> windowingStrategy,
- TimerInternals timerInternals,
- Aggregator<Long, Long> droppedDueToLateness) {
- this.doFnRunner = doFnRunner;
- lateDataFilter = new LateDataFilter(windowingStrategy, timerInternals, droppedDueToLateness);
- }
-
- @Override
- public void startBundle() {
- doFnRunner.startBundle();
- }
-
- @Override
- public void processElement(WindowedValue<KeyedWorkItem<K, InputT>> elem) {
- Iterable<WindowedValue<InputT>> nonLateElements = lateDataFilter.filter(
- elem.getValue().key(), elem.getValue().elementsIterable());
- KeyedWorkItem<K, InputT> keyedWorkItem = KeyedWorkItems.workItem(
- elem.getValue().key(), elem.getValue().timersIterable(), nonLateElements);
- doFnRunner.processElement(elem.withValue(keyedWorkItem));
- }
-
- @Override
- public void finishBundle() {
- doFnRunner.finishBundle();
- }
-
- /**
- * It filters late data in a {@link KeyedWorkItem}.
- */
- @VisibleForTesting
- static class LateDataFilter {
- private final WindowingStrategy<?, ?> windowingStrategy;
- private final TimerInternals timerInternals;
- private final Aggregator<Long, Long> droppedDueToLateness;
-
- public LateDataFilter(
- WindowingStrategy<?, ?> windowingStrategy,
- TimerInternals timerInternals,
- Aggregator<Long, Long> droppedDueToLateness) {
- this.windowingStrategy = windowingStrategy;
- this.timerInternals = timerInternals;
- this.droppedDueToLateness = droppedDueToLateness;
- }
-
- /**
- * Returns an {@code Iterable<WindowedValue<InputT>>} that only contains
- * non-late input elements.
- */
- public <K, InputT> Iterable<WindowedValue<InputT>> filter(
- final K key, Iterable<WindowedValue<InputT>> elements) {
- Iterable<Iterable<WindowedValue<InputT>>> windowsExpandedElements = Iterables.transform(
- elements,
- new Function<WindowedValue<InputT>, Iterable<WindowedValue<InputT>>>() {
- @Override
- public Iterable<WindowedValue<InputT>> apply(final WindowedValue<InputT> input) {
- return Iterables.transform(
- input.getWindows(),
- new Function<BoundedWindow, WindowedValue<InputT>>() {
- @Override
- public WindowedValue<InputT> apply(BoundedWindow window) {
- return WindowedValue.of(
- input.getValue(), input.getTimestamp(), window, input.getPane());
- }
- });
- }});
-
- Iterable<WindowedValue<InputT>> nonLateElements = Iterables.filter(
- Iterables.concat(windowsExpandedElements),
- new Predicate<WindowedValue<InputT>>() {
- @Override
- public boolean apply(WindowedValue<InputT> input) {
- BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
- if (canDropDueToExpiredWindow(window)) {
- // The element is too late for this window.
- droppedDueToLateness.addValue(1L);
- WindowTracing.debug(
- "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
- + "since too far behind inputWatermark:{}; outputWatermark:{}",
- input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
- timerInternals.currentOutputWatermarkTime());
- return false;
- } else {
- return true;
- }
- }
- });
- return nonLateElements;
- }
-
- /** Is {@code window} expired w.r.t. the garbage collection watermark? */
- private boolean canDropDueToExpiredWindow(BoundedWindow window) {
- Instant inputWM = timerInternals.currentInputWatermarkTime();
- return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonEmptyPanes.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonEmptyPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonEmptyPanes.java
deleted file mode 100644
index e809c24..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonEmptyPanes.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateMerging;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-
-/**
- * Tracks which windows have non-empty panes. Specifically, which windows have new elements since
- * their last triggering.
- *
- * @param <W> The kind of windows being tracked.
- */
-public abstract class NonEmptyPanes<K, W extends BoundedWindow> {
-
- static <K, W extends BoundedWindow> NonEmptyPanes<K, W> create(
- WindowingStrategy<?, W> strategy, ReduceFn<K, ?, ?, W> reduceFn) {
- if (strategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES) {
- return new DiscardingModeNonEmptyPanes<>(reduceFn);
- } else {
- return new GeneralNonEmptyPanes<>();
- }
- }
-
- /**
- * Record that some content has been added to the window in {@code context}, and therefore the
- * current pane is not empty.
- */
- public abstract void recordContent(StateAccessor<K> context);
-
- /**
- * Record that the given pane is empty.
- */
- public abstract void clearPane(StateAccessor<K> state);
-
- /**
- * Return true if the current pane for the window in {@code context} is empty.
- */
- public abstract ReadableState<Boolean> isEmpty(StateAccessor<K> context);
-
- /**
- * Prefetch in preparation for merging.
- */
- public abstract void prefetchOnMerge(MergingStateAccessor<K, W> state);
-
- /**
- * Eagerly merge backing state.
- */
- public abstract void onMerge(MergingStateAccessor<K, W> context);
-
- /**
- * An implementation of {@code NonEmptyPanes} optimized for use with discarding mode. Uses the
- * presence of data in the accumulation buffer to record non-empty panes.
- */
- private static class DiscardingModeNonEmptyPanes<K, W extends BoundedWindow>
- extends NonEmptyPanes<K, W> {
-
- private ReduceFn<K, ?, ?, W> reduceFn;
-
- private DiscardingModeNonEmptyPanes(ReduceFn<K, ?, ?, W> reduceFn) {
- this.reduceFn = reduceFn;
- }
-
- @Override
- public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
- return reduceFn.isEmpty(state);
- }
-
- @Override
- public void recordContent(StateAccessor<K> state) {
- // Nothing to do -- the reduceFn is tracking contents
- }
-
- @Override
- public void clearPane(StateAccessor<K> state) {
- // Nothing to do -- the reduceFn is tracking contents
- }
-
- @Override
- public void prefetchOnMerge(MergingStateAccessor<K, W> state) {
- // Nothing to do -- the reduceFn is tracking contents
- }
-
- @Override
- public void onMerge(MergingStateAccessor<K, W> context) {
- // Nothing to do -- the reduceFn is tracking contents
- }
- }
-
- /**
- * An implementation of {@code NonEmptyPanes} for general use.
- */
- private static class GeneralNonEmptyPanes<K, W extends BoundedWindow>
- extends NonEmptyPanes<K, W> {
-
- private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
- PANE_ADDITIONS_TAG =
- StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
- "count", VarLongCoder.of(), new Sum.SumLongFn()));
-
- @Override
- public void recordContent(StateAccessor<K> state) {
- state.access(PANE_ADDITIONS_TAG).add(1L);
- }
-
- @Override
- public void clearPane(StateAccessor<K> state) {
- state.access(PANE_ADDITIONS_TAG).clear();
- }
-
- @Override
- public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
- return state.access(PANE_ADDITIONS_TAG).isEmpty();
- }
-
- @Override
- public void prefetchOnMerge(MergingStateAccessor<K, W> state) {
- StateMerging.prefetchCombiningValues(state, PANE_ADDITIONS_TAG);
- }
-
- @Override
- public void onMerge(MergingStateAccessor<K, W> context) {
- StateMerging.mergeCombiningValues(context, PANE_ADDITIONS_TAG);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
deleted file mode 100644
index 5e08031..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.util.state.ValueState;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import org.joda.time.Instant;
-
-/**
- * Determine the timing and other properties of a new pane for a given computation, key and window.
- * Incorporates any previous pane, whether the pane has been produced because an
- * on-time {@link AfterWatermark} trigger firing, and the relation between the element's timestamp
- * and the current output watermark.
- */
-public class PaneInfoTracker {
- private TimerInternals timerInternals;
-
- public PaneInfoTracker(TimerInternals timerInternals) {
- this.timerInternals = timerInternals;
- }
-
- @VisibleForTesting
- static final StateTag<Object, ValueState<PaneInfo>> PANE_INFO_TAG =
- StateTags.makeSystemTagInternal(StateTags.value("pane", PaneInfoCoder.INSTANCE));
-
- public void clear(StateAccessor<?> state) {
- state.access(PANE_INFO_TAG).clear();
- }
-
- /**
- * Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane
- * info includes the timing for the pane, who's calculation is quite subtle.
- *
- * @param isFinal should be {@code true} only if the triggering machinery can guarantee
- * no further firings for the
- */
- public ReadableState<PaneInfo> getNextPaneInfo(
- ReduceFn<?, ?, ?, ?>.Context context, final boolean isFinal) {
- final Object key = context.key();
- final ReadableState<PaneInfo> previousPaneFuture =
- context.state().access(PaneInfoTracker.PANE_INFO_TAG);
- final Instant windowMaxTimestamp = context.window().maxTimestamp();
-
- return new ReadableState<PaneInfo>() {
- @Override
- public ReadableState<PaneInfo> readLater() {
- previousPaneFuture.readLater();
- return this;
- }
-
- @Override
- public PaneInfo read() {
- PaneInfo previousPane = previousPaneFuture.read();
- return describePane(key, windowMaxTimestamp, previousPane, isFinal);
- }
- };
- }
-
- public void storeCurrentPaneInfo(ReduceFn<?, ?, ?, ?>.Context context, PaneInfo currentPane) {
- context.state().access(PANE_INFO_TAG).write(currentPane);
- }
-
- private <W> PaneInfo describePane(
- Object key, Instant windowMaxTimestamp, PaneInfo previousPane, boolean isFinal) {
- boolean isFirst = previousPane == null;
- Timing previousTiming = isFirst ? null : previousPane.getTiming();
- long index = isFirst ? 0 : previousPane.getIndex() + 1;
- long nonSpeculativeIndex = isFirst ? 0 : previousPane.getNonSpeculativeIndex() + 1;
- Instant outputWM = timerInternals.currentOutputWatermarkTime();
- Instant inputWM = timerInternals.currentInputWatermarkTime();
-
- // True if it is not possible to assign the element representing this pane a timestamp
- // which will make an ON_TIME pane for any following computation.
- // Ie true if the element's latest possible timestamp is before the current output watermark.
- boolean isLateForOutput = outputWM != null && windowMaxTimestamp.isBefore(outputWM);
-
- // True if all emitted panes (if any) were EARLY panes.
- // Once the ON_TIME pane has fired, all following panes must be considered LATE even
- // if the output watermark is behind the end of the window.
- boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY;
-
- // True is the input watermark hasn't passed the window's max timestamp.
- boolean isEarlyForInput = !inputWM.isAfter(windowMaxTimestamp);
-
- Timing timing;
- if (isLateForOutput || !onlyEarlyPanesSoFar) {
- // The output watermark has already passed the end of this window, or we have already
- // emitted a non-EARLY pane. Irrespective of how this pane was triggered we must
- // consider this pane LATE.
- timing = Timing.LATE;
- } else if (isEarlyForInput) {
- // This is an EARLY firing.
- timing = Timing.EARLY;
- nonSpeculativeIndex = -1;
- } else {
- // This is the unique ON_TIME firing for the window.
- timing = Timing.ON_TIME;
- }
-
- WindowTracing.debug(
- "describePane: {} pane (prev was {}) for key:{}; windowMaxTimestamp:{}; "
- + "inputWatermark:{}; outputWatermark:{}; isLateForOutput:{}",
- timing, previousTiming, key, windowMaxTimestamp, inputWM, outputWM, isLateForOutput);
-
- if (previousPane != null) {
- // Timing transitions should follow EARLY* ON_TIME? LATE*
- switch (previousTiming) {
- case EARLY:
- Preconditions.checkState(
- timing == Timing.EARLY || timing == Timing.ON_TIME || timing == Timing.LATE,
- "EARLY cannot transition to %s", timing);
- break;
- case ON_TIME:
- Preconditions.checkState(
- timing == Timing.LATE, "ON_TIME cannot transition to %s", timing);
- break;
- case LATE:
- Preconditions.checkState(timing == Timing.LATE, "LATE cannot transtion to %s", timing);
- break;
- case UNKNOWN:
- break;
- }
- Preconditions.checkState(!previousPane.isLast(), "Last pane was not last after all.");
- }
-
- return PaneInfo.createPane(isFirst, isFinal, timing, index, nonSpeculativeIndex);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
deleted file mode 100644
index b1442dd..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.PCollectionView;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning
- * them via the {@link #processElementInReadyWindows(WindowedValue)}.
- */
-public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
- private final DoFnRunner<InputT, OutputT> underlying;
- private final Collection<PCollectionView<?>> views;
- private final ReadyCheckingSideInputReader sideInputReader;
-
- private Set<BoundedWindow> notReadyWindows;
-
- public static <InputT, OutputT> PushbackSideInputDoFnRunner<InputT, OutputT> create(
- DoFnRunner<InputT, OutputT> underlying,
- Collection<PCollectionView<?>> views,
- ReadyCheckingSideInputReader sideInputReader) {
- return new PushbackSideInputDoFnRunner<>(underlying, views, sideInputReader);
- }
-
- private PushbackSideInputDoFnRunner(
- DoFnRunner<InputT, OutputT> underlying,
- Collection<PCollectionView<?>> views,
- ReadyCheckingSideInputReader sideInputReader) {
- this.underlying = underlying;
- this.views = views;
- this.sideInputReader = sideInputReader;
- }
-
- @Override
- public void startBundle() {
- notReadyWindows = new HashSet<>();
- underlying.startBundle();
- }
-
- /**
- * Call the underlying {@link DoFnRunner#processElement(WindowedValue)} for the provided element
- * for each window the element is in that is ready.
- *
- * @param elem the element to process in all ready windows
- * @return each element that could not be processed because it requires a side input window
- * that is not ready.
- */
- public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
- if (views.isEmpty()) {
- processElement(elem);
- return Collections.emptyList();
- }
- ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder();
- for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
- BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows());
- boolean isReady = !notReadyWindows.contains(mainInputWindow);
- for (PCollectionView<?> view : views) {
- BoundedWindow sideInputWindow =
- view.getWindowingStrategyInternal()
- .getWindowFn()
- .getSideInputWindow(mainInputWindow);
- if (!sideInputReader.isReady(view, sideInputWindow)) {
- isReady = false;
- break;
- }
- }
- if (isReady) {
- processElement(windowElem);
- } else {
- notReadyWindows.add(mainInputWindow);
- pushedBack.add(windowElem);
- }
- }
- return pushedBack.build();
- }
-
- @Override
- public void processElement(WindowedValue<InputT> elem) {
- underlying.processElement(elem);
- }
-
- /**
- * Call the underlying {@link DoFnRunner#finishBundle()}.
- */
- @Override
- public void finishBundle() {
- notReadyWindows = null;
- underlying.finishBundle();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFn.java
deleted file mode 100644
index c5ee1e1..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFn.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateAccessor;
-
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-
-/**
- * Specification for processing to happen after elements have been grouped by key.
- *
- * @param <K> The type of key being processed.
- * @param <InputT> The type of input values associated with the key.
- * @param <OutputT> The output type that will be produced for each key.
- * @param <W> The type of windows this operates on.
- */
-public abstract class ReduceFn<K, InputT, OutputT, W extends BoundedWindow>
- implements Serializable {
-
- /** Information accessible to all the processing methods in this {@code ReduceFn}. */
- public abstract class Context {
- /** Return the key that is being processed. */
- public abstract K key();
-
- /** The window that is being processed. */
- public abstract W window();
-
- /** Access the current {@link WindowingStrategy}. */
- public abstract WindowingStrategy<?, W> windowingStrategy();
-
- /** Return the interface for accessing state. */
- public abstract StateAccessor<K> state();
-
- /** Return the interface for accessing timers. */
- public abstract Timers timers();
- }
-
- /** Information accessible within {@link #processValue}. */
- public abstract class ProcessValueContext extends Context {
- /** Return the actual value being processed. */
- public abstract InputT value();
-
- /** Return the timestamp associated with the value. */
- public abstract Instant timestamp();
- }
-
- /** Information accessible within {@link #onMerge}. */
- public abstract class OnMergeContext extends Context {
- /** Return the interface for accessing state. */
- @Override
- public abstract MergingStateAccessor<K, W> state();
- }
-
- /** Information accessible within {@link #onTrigger}. */
- public abstract class OnTriggerContext extends Context {
- /** Returns the {@link PaneInfo} for the trigger firing being processed. */
- public abstract PaneInfo paneInfo();
-
- /** Output the given value in the current window. */
- public abstract void output(OutputT value);
- }
-
- //////////////////////////////////////////////////////////////////////////////////////////////////
-
- /**
- * Called for each value of type {@code InputT} associated with the current key.
- */
- public abstract void processValue(ProcessValueContext c) throws Exception;
-
- /**
- * Called when windows are merged.
- */
- public abstract void onMerge(OnMergeContext context) throws Exception;
-
- /**
- * Called when triggers fire.
- *
- * <p>Implementations of {@link ReduceFn} should call {@link OnTriggerContext#output} to emit
- * any results that should be included in the pane produced by this trigger firing.
- */
- public abstract void onTrigger(OnTriggerContext context) throws Exception;
-
- /**
- * Called before {@link #onMerge} is invoked to provide an opportunity to prefetch any needed
- * state.
- *
- * @param c Context to use prefetch from.
- */
- public void prefetchOnMerge(MergingStateAccessor<K, W> c) throws Exception {}
-
- /**
- * Called before {@link #onTrigger} is invoked to provide an opportunity to prefetch any needed
- * state.
- *
- * @param context Context to use prefetch from.
- */
- public void prefetchOnTrigger(StateAccessor<K> context) {}
-
- /**
- * Called to clear any persisted state that the {@link ReduceFn} may be holding. This will be
- * called when the windowing is closing and will receive no future interactions.
- */
- public abstract void clearState(Context context) throws Exception;
-
- /**
- * Returns true if the there is no buffered state.
- */
- public abstract ReadableState<Boolean> isEmpty(StateAccessor<K> context);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java
deleted file mode 100644
index c90940e..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java
+++ /dev/null
@@ -1,497 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateContext;
-import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
-import org.apache.beam.sdk.util.state.StateTag;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.Instant;
-
-import java.util.Collection;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * Factory for creating instances of the various {@link ReduceFn} contexts.
- */
-class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
- public interface OnTriggerCallbacks<OutputT> {
- void output(OutputT toOutput);
- }
-
- private final K key;
- private final ReduceFn<K, InputT, OutputT, W> reduceFn;
- private final WindowingStrategy<?, W> windowingStrategy;
- private final StateInternals<K> stateInternals;
- private final ActiveWindowSet<W> activeWindows;
- private final TimerInternals timerInternals;
- private final WindowingInternals<?, ?> windowingInternals;
- private final PipelineOptions options;
-
- ReduceFnContextFactory(K key, ReduceFn<K, InputT, OutputT, W> reduceFn,
- WindowingStrategy<?, W> windowingStrategy, StateInternals<K> stateInternals,
- ActiveWindowSet<W> activeWindows, TimerInternals timerInternals,
- WindowingInternals<?, ?> windowingInternals, PipelineOptions options) {
- this.key = key;
- this.reduceFn = reduceFn;
- this.windowingStrategy = windowingStrategy;
- this.stateInternals = stateInternals;
- this.activeWindows = activeWindows;
- this.timerInternals = timerInternals;
- this.windowingInternals = windowingInternals;
- this.options = options;
- }
-
- /** Where should we look for state associated with a given window? */
- public static enum StateStyle {
- /** All state is associated with the window itself. */
- DIRECT,
- /** State is associated with the 'state address' windows tracked by the active window set. */
- RENAMED
- }
-
- private StateAccessorImpl<K, W> stateAccessor(W window, StateStyle style) {
- return new StateAccessorImpl<K, W>(
- activeWindows, windowingStrategy.getWindowFn().windowCoder(),
- stateInternals, StateContexts.createFromComponents(options, windowingInternals, window),
- style);
- }
-
- public ReduceFn<K, InputT, OutputT, W>.Context base(W window, StateStyle style) {
- return new ContextImpl(stateAccessor(window, style));
- }
-
- public ReduceFn<K, InputT, OutputT, W>.ProcessValueContext forValue(
- W window, InputT value, Instant timestamp, StateStyle style) {
- return new ProcessValueContextImpl(stateAccessor(window, style), value, timestamp);
- }
-
- public ReduceFn<K, InputT, OutputT, W>.OnTriggerContext forTrigger(W window,
- ReadableState<PaneInfo> pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
- return new OnTriggerContextImpl(stateAccessor(window, style), pane, callbacks);
- }
-
- public ReduceFn<K, InputT, OutputT, W>.OnMergeContext forMerge(
- Collection<W> activeToBeMerged, W mergeResult, StateStyle style) {
- return new OnMergeContextImpl(
- new MergingStateAccessorImpl<K, W>(activeWindows,
- windowingStrategy.getWindowFn().windowCoder(),
- stateInternals, style, activeToBeMerged, mergeResult));
- }
-
- public ReduceFn<K, InputT, OutputT, W>.OnMergeContext forPremerge(W window) {
- return new OnPremergeContextImpl(new PremergingStateAccessorImpl<K, W>(
- activeWindows, windowingStrategy.getWindowFn().windowCoder(), stateInternals, window));
- }
-
- private class TimersImpl implements Timers {
- private final StateNamespace namespace;
-
- public TimersImpl(StateNamespace namespace) {
- Preconditions.checkArgument(namespace instanceof WindowNamespace);
- this.namespace = namespace;
- }
-
- @Override
- public void setTimer(Instant timestamp, TimeDomain timeDomain) {
- timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain));
- }
-
- @Override
- public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
- timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain));
- }
-
- @Override
- public Instant currentProcessingTime() {
- return timerInternals.currentProcessingTime();
- }
-
- @Override
- @Nullable
- public Instant currentSynchronizedProcessingTime() {
- return timerInternals.currentSynchronizedProcessingTime();
- }
-
- @Override
- public Instant currentEventTime() {
- return timerInternals.currentInputWatermarkTime();
- }
- }
-
- // ======================================================================
- // StateAccessors
- // ======================================================================
- static class StateAccessorImpl<K, W extends BoundedWindow> implements StateAccessor<K> {
-
-
- protected final ActiveWindowSet<W> activeWindows;
- protected final StateContext<W> context;
- protected final StateNamespace windowNamespace;
- protected final Coder<W> windowCoder;
- protected final StateInternals<K> stateInternals;
- protected final StateStyle style;
-
- public StateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
- StateInternals<K> stateInternals, StateContext<W> context, StateStyle style) {
-
- this.activeWindows = activeWindows;
- this.windowCoder = windowCoder;
- this.stateInternals = stateInternals;
- this.context = checkNotNull(context);
- this.windowNamespace = namespaceFor(context.window());
- this.style = style;
- }
-
- protected StateNamespace namespaceFor(W window) {
- return StateNamespaces.window(windowCoder, window);
- }
-
- protected StateNamespace windowNamespace() {
- return windowNamespace;
- }
-
- W window() {
- return context.window();
- }
-
- StateNamespace namespace() {
- return windowNamespace();
- }
-
- @Override
- public <StateT extends State> StateT access(StateTag<? super K, StateT> address) {
- switch (style) {
- case DIRECT:
- return stateInternals.state(windowNamespace(), address, context);
- case RENAMED:
- return stateInternals.state(
- namespaceFor(activeWindows.writeStateAddress(context.window())), address, context);
- }
- throw new RuntimeException(); // cases are exhaustive.
- }
- }
-
- static class MergingStateAccessorImpl<K, W extends BoundedWindow>
- extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> {
- private final Collection<W> activeToBeMerged;
-
- public MergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
- StateInternals<K> stateInternals, StateStyle style, Collection<W> activeToBeMerged,
- W mergeResult) {
- super(activeWindows, windowCoder, stateInternals,
- StateContexts.windowOnly(mergeResult), style);
- this.activeToBeMerged = activeToBeMerged;
- }
-
- @Override
- public <StateT extends State> StateT access(StateTag<? super K, StateT> address) {
- switch (style) {
- case DIRECT:
- return stateInternals.state(windowNamespace(), address, context);
- case RENAMED:
- return stateInternals.state(
- namespaceFor(activeWindows.mergedWriteStateAddress(
- activeToBeMerged, context.window())),
- address,
- context);
- }
- throw new RuntimeException(); // cases are exhaustive.
- }
-
- @Override
- public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
- StateTag<? super K, StateT> address) {
- ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
- for (W mergingWindow : activeToBeMerged) {
- StateNamespace namespace = null;
- switch (style) {
- case DIRECT:
- namespace = namespaceFor(mergingWindow);
- break;
- case RENAMED:
- namespace = namespaceFor(activeWindows.writeStateAddress(mergingWindow));
- break;
- }
- Preconditions.checkNotNull(namespace); // cases are exhaustive.
- builder.put(mergingWindow, stateInternals.state(namespace, address, context));
- }
- return builder.build();
- }
- }
-
- static class PremergingStateAccessorImpl<K, W extends BoundedWindow>
- extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> {
- public PremergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
- StateInternals<K> stateInternals, W window) {
- super(activeWindows, windowCoder, stateInternals,
- StateContexts.windowOnly(window), StateStyle.RENAMED);
- }
-
- Collection<W> mergingWindows() {
- return activeWindows.readStateAddresses(context.window());
- }
-
- @Override
- public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
- StateTag<? super K, StateT> address) {
- ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
- for (W stateAddressWindow : activeWindows.readStateAddresses(context.window())) {
- StateT stateForWindow =
- stateInternals.state(namespaceFor(stateAddressWindow), address, context);
- builder.put(stateAddressWindow, stateForWindow);
- }
- return builder.build();
- }
- }
-
- // ======================================================================
- // Contexts
- // ======================================================================
-
- private class ContextImpl extends ReduceFn<K, InputT, OutputT, W>.Context {
- private final StateAccessorImpl<K, W> state;
- private final TimersImpl timers;
-
- private ContextImpl(StateAccessorImpl<K, W> state) {
- reduceFn.super();
- this.state = state;
- this.timers = new TimersImpl(state.namespace());
- }
-
- @Override
- public K key() {
- return key;
- }
-
- @Override
- public W window() {
- return state.window();
- }
-
- @Override
- public WindowingStrategy<?, W> windowingStrategy() {
- return windowingStrategy;
- }
-
- @Override
- public StateAccessor<K> state() {
- return state;
- }
-
- @Override
- public Timers timers() {
- return timers;
- }
- }
-
- private class ProcessValueContextImpl
- extends ReduceFn<K, InputT, OutputT, W>.ProcessValueContext {
- private final InputT value;
- private final Instant timestamp;
- private final StateAccessorImpl<K, W> state;
- private final TimersImpl timers;
-
- private ProcessValueContextImpl(StateAccessorImpl<K, W> state,
- InputT value, Instant timestamp) {
- reduceFn.super();
- this.state = state;
- this.value = value;
- this.timestamp = timestamp;
- this.timers = new TimersImpl(state.namespace());
- }
-
- @Override
- public K key() {
- return key;
- }
-
- @Override
- public W window() {
- return state.window();
- }
-
- @Override
- public WindowingStrategy<?, W> windowingStrategy() {
- return windowingStrategy;
- }
-
- @Override
- public StateAccessor<K> state() {
- return state;
- }
-
- @Override
- public InputT value() {
- return value;
- }
-
- @Override
- public Instant timestamp() {
- return timestamp;
- }
-
- @Override
- public Timers timers() {
- return timers;
- }
- }
-
- private class OnTriggerContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnTriggerContext {
- private final StateAccessorImpl<K, W> state;
- private final ReadableState<PaneInfo> pane;
- private final OnTriggerCallbacks<OutputT> callbacks;
- private final TimersImpl timers;
-
- private OnTriggerContextImpl(StateAccessorImpl<K, W> state, ReadableState<PaneInfo> pane,
- OnTriggerCallbacks<OutputT> callbacks) {
- reduceFn.super();
- this.state = state;
- this.pane = pane;
- this.callbacks = callbacks;
- this.timers = new TimersImpl(state.namespace());
- }
-
- @Override
- public K key() {
- return key;
- }
-
- @Override
- public W window() {
- return state.window();
- }
-
- @Override
- public WindowingStrategy<?, W> windowingStrategy() {
- return windowingStrategy;
- }
-
- @Override
- public StateAccessor<K> state() {
- return state;
- }
-
- @Override
- public PaneInfo paneInfo() {
- return pane.read();
- }
-
- @Override
- public void output(OutputT value) {
- callbacks.output(value);
- }
-
- @Override
- public Timers timers() {
- return timers;
- }
- }
-
- private class OnMergeContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnMergeContext {
- private final MergingStateAccessorImpl<K, W> state;
- private final TimersImpl timers;
-
- private OnMergeContextImpl(MergingStateAccessorImpl<K, W> state) {
- reduceFn.super();
- this.state = state;
- this.timers = new TimersImpl(state.namespace());
- }
-
- @Override
- public K key() {
- return key;
- }
-
- @Override
- public WindowingStrategy<?, W> windowingStrategy() {
- return windowingStrategy;
- }
-
- @Override
- public MergingStateAccessor<K, W> state() {
- return state;
- }
-
- @Override
- public W window() {
- return state.window();
- }
-
- @Override
- public Timers timers() {
- return timers;
- }
- }
-
- private class OnPremergeContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnMergeContext {
- private final PremergingStateAccessorImpl<K, W> state;
- private final TimersImpl timers;
-
- private OnPremergeContextImpl(PremergingStateAccessorImpl<K, W> state) {
- reduceFn.super();
- this.state = state;
- this.timers = new TimersImpl(state.namespace());
- }
-
- @Override
- public K key() {
- return key;
- }
-
- @Override
- public WindowingStrategy<?, W> windowingStrategy() {
- return windowingStrategy;
- }
-
- @Override
- public MergingStateAccessor<K, W> state() {
- return state;
- }
-
- @Override
- public W window() {
- return state.window();
- }
-
- @Override
- public Timers timers() {
- return timers;
- }
- }
-}