You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:15:08 UTC
[34/50] [abbrv] beam git commit: [BEAM-1994] Remove Flink examples
package
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
deleted file mode 100644
index 24b340e..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
+++ /dev/null
@@ -1,487 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.Context;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.MapState;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.SetState;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateContext;
-import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.KeyGroupsList;
-import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.apache.flink.streaming.api.operators.HeapInternalTimerService;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
-
-/**
- * {@link StateInternals} that uses {@link KeyGroupCheckpointedOperator}
- * to checkpoint state.
- *
- * <p>Note:
- * Ignore index of key.
- * Just implement BagState.
- *
- * <p>Reference from {@link HeapInternalTimerService} to the local key-group range.
- */
-public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
-
- private final Coder<K> keyCoder;
- private final KeyGroupsList localKeyGroupRange;
- private KeyedStateBackend keyedStateBackend;
- private final int localKeyGroupRangeStartIdx;
-
- // stateName -> namespace -> (valueCoder, value)
- private final Map<String, Tuple2<Coder<?>, Map<String, ?>>>[] stateTables;
-
- public FlinkKeyGroupStateInternals(
- Coder<K> keyCoder,
- KeyedStateBackend keyedStateBackend) {
- this.keyCoder = keyCoder;
- this.keyedStateBackend = keyedStateBackend;
- this.localKeyGroupRange = keyedStateBackend.getKeyGroupRange();
- // find the starting index of the local key-group range
- int startIdx = Integer.MAX_VALUE;
- for (Integer keyGroupIdx : localKeyGroupRange) {
- startIdx = Math.min(keyGroupIdx, startIdx);
- }
- this.localKeyGroupRangeStartIdx = startIdx;
- stateTables = (Map<String, Tuple2<Coder<?>, Map<String, ?>>>[])
- new Map[localKeyGroupRange.getNumberOfKeyGroups()];
- for (int i = 0; i < stateTables.length; i++) {
- stateTables[i] = new HashMap<>();
- }
- }
-
- @Override
- public K getKey() {
- ByteBuffer keyBytes = (ByteBuffer) keyedStateBackend.getCurrentKey();
- try {
- return CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array());
- } catch (CoderException e) {
- throw new RuntimeException("Error decoding key.", e);
- }
- }
-
- @Override
- public <T extends State> T state(
- final StateNamespace namespace,
- StateTag<? super K, T> address) {
-
- return state(namespace, address, StateContexts.nullContext());
- }
-
- @Override
- public <T extends State> T state(
- final StateNamespace namespace,
- StateTag<? super K, T> address,
- final StateContext<?> context) {
-
- return address.bind(new StateTag.StateBinder<K>() {
-
- @Override
- public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address,
- Coder<T> coder) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", ValueState.class.getSimpleName()));
- }
-
- @Override
- public <T> BagState<T> bindBag(
- StateTag<? super K, BagState<T>> address,
- Coder<T> elemCoder) {
-
- return new FlinkKeyGroupBagState<>(address, namespace, elemCoder);
- }
-
- @Override
- public <T> SetState<T> bindSet(
- StateTag<? super K, SetState<T>> address,
- Coder<T> elemCoder) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", SetState.class.getSimpleName()));
- }
-
- @Override
- public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> spec,
- Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", MapState.class.getSimpleName()));
- }
-
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT>
- bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
- throw new UnsupportedOperationException("bindCombiningValue is not supported.");
- }
-
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- throw new UnsupportedOperationException("bindKeyedCombiningValue is not supported.");
-
- }
-
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- CombineWithContext.KeyedCombineFnWithContext<
- ? super K, InputT, AccumT, OutputT> combineFn) {
- throw new UnsupportedOperationException(
- "bindKeyedCombiningValueWithContext is not supported.");
- }
-
- @Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", CombiningState.class.getSimpleName()));
- }
- });
- }
-
- /**
- * Reference from {@link Combine.CombineFn}.
- *
- * <p>Accumulators are stored in each KeyGroup, call addInput() when a element comes,
- * call extractOutput() to produce the desired value when need to read data.
- */
- interface KeyGroupCombiner<InputT, AccumT, OutputT> {
-
- /**
- * Returns a new, mutable accumulator value, representing the accumulation
- * of zero input values.
- */
- AccumT createAccumulator();
-
- /**
- * Adds the given input value to the given accumulator, returning the
- * new accumulator value.
- */
- AccumT addInput(AccumT accumulator, InputT input);
-
- /**
- * Returns the output value that is the result of all accumulators from KeyGroups
- * that are assigned to this operator.
- */
- OutputT extractOutput(Iterable<AccumT> accumulators);
- }
-
- private abstract class AbstractKeyGroupState<InputT, AccumT, OutputT> {
-
- private String stateName;
- private String namespace;
- private Coder<AccumT> coder;
- private KeyGroupCombiner<InputT, AccumT, OutputT> keyGroupCombiner;
-
- AbstractKeyGroupState(
- String stateName,
- String namespace,
- Coder<AccumT> coder,
- KeyGroupCombiner<InputT, AccumT, OutputT> keyGroupCombiner) {
- this.stateName = stateName;
- this.namespace = namespace;
- this.coder = coder;
- this.keyGroupCombiner = keyGroupCombiner;
- }
-
- /**
- * Choose keyGroup of input and addInput to accumulator.
- */
- void addInput(InputT input) {
- int keyGroupIdx = keyedStateBackend.getCurrentKeyGroupIndex();
- int localIdx = getIndexForKeyGroup(keyGroupIdx);
- Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable = stateTables[localIdx];
- Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName);
- if (tuple2 == null) {
- tuple2 = new Tuple2<>();
- tuple2.f0 = coder;
- tuple2.f1 = new HashMap<>();
- stateTable.put(stateName, tuple2);
- }
- Map<String, AccumT> map = (Map<String, AccumT>) tuple2.f1;
- AccumT accumulator = map.get(namespace);
- if (accumulator == null) {
- accumulator = keyGroupCombiner.createAccumulator();
- }
- accumulator = keyGroupCombiner.addInput(accumulator, input);
- map.put(namespace, accumulator);
- }
-
- /**
- * Get all accumulators and invoke extractOutput().
- */
- OutputT extractOutput() {
- List<AccumT> accumulators = new ArrayList<>(stateTables.length);
- for (Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable : stateTables) {
- Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName);
- if (tuple2 != null) {
- AccumT accumulator = (AccumT) tuple2.f1.get(namespace);
- if (accumulator != null) {
- accumulators.add(accumulator);
- }
- }
- }
- return keyGroupCombiner.extractOutput(accumulators);
- }
-
- /**
- * Find the first accumulator and return immediately.
- */
- boolean isEmptyInternal() {
- for (Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable : stateTables) {
- Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName);
- if (tuple2 != null) {
- AccumT accumulator = (AccumT) tuple2.f1.get(namespace);
- if (accumulator != null) {
- return false;
- }
- }
- }
- return true;
- }
-
- /**
- * Clear accumulators and clean empty map.
- */
- void clearInternal() {
- for (Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable : stateTables) {
- Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName);
- if (tuple2 != null) {
- tuple2.f1.remove(namespace);
- if (tuple2.f1.size() == 0) {
- stateTable.remove(stateName);
- }
- }
- }
- }
-
- }
-
- private int getIndexForKeyGroup(int keyGroupIdx) {
- checkArgument(localKeyGroupRange.contains(keyGroupIdx),
- "Key Group " + keyGroupIdx + " does not belong to the local range.");
- return keyGroupIdx - this.localKeyGroupRangeStartIdx;
- }
-
- private class KeyGroupBagCombiner<T> implements KeyGroupCombiner<T, List<T>, Iterable<T>> {
-
- @Override
- public List<T> createAccumulator() {
- return new ArrayList<>();
- }
-
- @Override
- public List<T> addInput(List<T> accumulator, T input) {
- accumulator.add(input);
- return accumulator;
- }
-
- @Override
- public Iterable<T> extractOutput(Iterable<List<T>> accumulators) {
- List<T> result = new ArrayList<>();
- // maybe can return an unmodifiable view.
- for (List<T> list : accumulators) {
- result.addAll(list);
- }
- return result;
- }
- }
-
- private class FlinkKeyGroupBagState<T> extends AbstractKeyGroupState<T, List<T>, Iterable<T>>
- implements BagState<T> {
-
- private final StateNamespace namespace;
- private final StateTag<? super K, BagState<T>> address;
-
- FlinkKeyGroupBagState(
- StateTag<? super K, BagState<T>> address,
- StateNamespace namespace,
- Coder<T> coder) {
- super(address.getId(), namespace.stringKey(), ListCoder.of(coder),
- new KeyGroupBagCombiner<T>());
- this.namespace = namespace;
- this.address = address;
- }
-
- @Override
- public void add(T input) {
- addInput(input);
- }
-
- @Override
- public BagState<T> readLater() {
- return this;
- }
-
- @Override
- public Iterable<T> read() {
- Iterable<T> result = extractOutput();
- return result != null ? result : Collections.<T>emptyList();
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public Boolean read() {
- try {
- return isEmptyInternal();
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
-
- }
-
- @Override
- public ReadableState<Boolean> readLater() {
- return this;
- }
- };
- }
-
- @Override
- public void clear() {
- clearInternal();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- FlinkKeyGroupBagState<?> that = (FlinkKeyGroupBagState<?>) o;
-
- return namespace.equals(that.namespace) && address.equals(that.address);
-
- }
-
- @Override
- public int hashCode() {
- int result = namespace.hashCode();
- result = 31 * result + address.hashCode();
- return result;
- }
- }
-
- /**
- * Snapshots the state {@code (stateName -> (valueCoder && (namespace -> value)))} for a given
- * {@code keyGroupIdx}.
- *
- * @param keyGroupIdx the id of the key-group to be put in the snapshot.
- * @param out the stream to write to.
- */
- public void snapshotKeyGroupState(int keyGroupIdx, DataOutputStream out) throws Exception {
- int localIdx = getIndexForKeyGroup(keyGroupIdx);
- Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable = stateTables[localIdx];
- Preconditions.checkState(stateTable.size() <= Short.MAX_VALUE,
- "Too many States: " + stateTable.size() + ". Currently at most "
- + Short.MAX_VALUE + " states are supported");
- out.writeShort(stateTable.size());
- for (Map.Entry<String, Tuple2<Coder<?>, Map<String, ?>>> entry : stateTable.entrySet()) {
- out.writeUTF(entry.getKey());
- Coder coder = entry.getValue().f0;
- InstantiationUtil.serializeObject(out, coder);
- Map<String, ?> map = entry.getValue().f1;
- out.writeInt(map.size());
- for (Map.Entry<String, ?> entry1 : map.entrySet()) {
- StringUtf8Coder.of().encode(entry1.getKey(), out, Context.NESTED);
- coder.encode(entry1.getValue(), out, Context.NESTED);
- }
- }
- }
-
- /**
- * Restore the state {@code (stateName -> (valueCoder && (namespace -> value)))}
- * for a given {@code keyGroupIdx}.
- *
- * @param keyGroupIdx the id of the key-group to be put in the snapshot.
- * @param in the stream to read from.
- * @param userCodeClassLoader the class loader that will be used to deserialize
- * the valueCoder.
- */
- public void restoreKeyGroupState(int keyGroupIdx, DataInputStream in,
- ClassLoader userCodeClassLoader) throws Exception {
- int localIdx = getIndexForKeyGroup(keyGroupIdx);
- Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable = stateTables[localIdx];
- int numStates = in.readShort();
- for (int i = 0; i < numStates; ++i) {
- String stateName = in.readUTF();
- Coder coder = InstantiationUtil.deserializeObject(in, userCodeClassLoader);
- Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName);
- if (tuple2 == null) {
- tuple2 = new Tuple2<>();
- tuple2.f0 = coder;
- tuple2.f1 = new HashMap<>();
- stateTable.put(stateName, tuple2);
- }
- Map<String, Object> map = (Map<String, Object>) tuple2.f1;
- int mapSize = in.readInt();
- for (int j = 0; j < mapSize; j++) {
- String namespace = StringUtf8Coder.of().decode(in, Context.NESTED);
- Object value = coder.decode(in, Context.NESTED);
- map.put(namespace, value);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
deleted file mode 100644
index 2bf0bf1..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import com.google.common.collect.Iterators;
-import java.util.Collections;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.MapState;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.SetState;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateContext;
-import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.runtime.state.OperatorStateBackend;
-
-/**
- * {@link StateInternals} that uses a Flink {@link OperatorStateBackend}
- * to manage the split-distribute state.
- *
- * <p>Elements in ListState will be redistributed in round robin fashion
- * to operators when restarting with a different parallelism.
- *
- * <p>Note:
- * Ignore index of key and namespace.
- * Just implement BagState.
- */
-public class FlinkSplitStateInternals<K> implements StateInternals<K> {
-
- private final OperatorStateBackend stateBackend;
-
- public FlinkSplitStateInternals(OperatorStateBackend stateBackend) {
- this.stateBackend = stateBackend;
- }
-
- @Override
- public K getKey() {
- return null;
- }
-
- @Override
- public <T extends State> T state(
- final StateNamespace namespace,
- StateTag<? super K, T> address) {
-
- return state(namespace, address, StateContexts.nullContext());
- }
-
- @Override
- public <T extends State> T state(
- final StateNamespace namespace,
- StateTag<? super K, T> address,
- final StateContext<?> context) {
-
- return address.bind(new StateTag.StateBinder<K>() {
-
- @Override
- public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address,
- Coder<T> coder) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", ValueState.class.getSimpleName()));
- }
-
- @Override
- public <T> BagState<T> bindBag(
- StateTag<? super K, BagState<T>> address,
- Coder<T> elemCoder) {
-
- return new FlinkSplitBagState<>(stateBackend, address, namespace, elemCoder);
- }
-
- @Override
- public <T> SetState<T> bindSet(
- StateTag<? super K, SetState<T>> address,
- Coder<T> elemCoder) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", SetState.class.getSimpleName()));
- }
-
- @Override
- public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> spec,
- Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", MapState.class.getSimpleName()));
- }
-
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT>
- bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
- throw new UnsupportedOperationException("bindCombiningValue is not supported.");
- }
-
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- throw new UnsupportedOperationException("bindKeyedCombiningValue is not supported.");
-
- }
-
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- CombineWithContext.KeyedCombineFnWithContext<
- ? super K, InputT, AccumT, OutputT> combineFn) {
- throw new UnsupportedOperationException(
- "bindKeyedCombiningValueWithContext is not supported.");
- }
-
- @Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", CombiningState.class.getSimpleName()));
- }
- });
- }
-
- private static class FlinkSplitBagState<K, T> implements BagState<T> {
-
- private final ListStateDescriptor<T> descriptor;
- private OperatorStateBackend flinkStateBackend;
- private final StateNamespace namespace;
- private final StateTag<? super K, BagState<T>> address;
-
- FlinkSplitBagState(
- OperatorStateBackend flinkStateBackend,
- StateTag<? super K, BagState<T>> address,
- StateNamespace namespace,
- Coder<T> coder) {
- this.flinkStateBackend = flinkStateBackend;
- this.namespace = namespace;
- this.address = address;
-
- CoderTypeInformation<T> typeInfo =
- new CoderTypeInformation<>(coder);
-
- descriptor = new ListStateDescriptor<>(address.getId(),
- typeInfo.createSerializer(new ExecutionConfig()));
- }
-
- @Override
- public void add(T input) {
- try {
- flinkStateBackend.getOperatorState(descriptor).add(input);
- } catch (Exception e) {
- throw new RuntimeException("Error updating state.", e);
- }
- }
-
- @Override
- public BagState<T> readLater() {
- return this;
- }
-
- @Override
- public Iterable<T> read() {
- try {
- Iterable<T> result = flinkStateBackend.getOperatorState(descriptor).get();
- return result != null ? result : Collections.<T>emptyList();
- } catch (Exception e) {
- throw new RuntimeException("Error updating state.", e);
- }
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public Boolean read() {
- try {
- Iterable<T> result = flinkStateBackend.getOperatorState(descriptor).get();
- // PartitionableListState.get() return empty collection When there is no element,
- // KeyedListState different. (return null)
- return result == null || Iterators.size(result.iterator()) == 0;
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
-
- }
-
- @Override
- public ReadableState<Boolean> readLater() {
- return this;
- }
- };
- }
-
- @Override
- public void clear() {
- try {
- flinkStateBackend.getOperatorState(descriptor).clear();
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- FlinkSplitBagState<?, ?> that = (FlinkSplitBagState<?, ?>) o;
-
- return namespace.equals(that.namespace) && address.equals(that.address);
-
- }
-
- @Override
- public int hashCode() {
- int result = namespace.hashCode();
- result = 31 * result + address.hashCode();
- return result;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
deleted file mode 100644
index 4f961e5..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ /dev/null
@@ -1,1053 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import com.google.common.collect.Lists;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.CombineContextFactory;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.MapState;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.SetState;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateContext;
-import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.joda.time.Instant;
-
-/**
- * {@link StateInternals} that uses a Flink {@link KeyedStateBackend} to manage state.
- *
- * <p>Note: In the Flink streaming runner the key is always encoded
- * using an {@link Coder} and stored in a {@link ByteBuffer}.
- */
-public class FlinkStateInternals<K> implements StateInternals<K> {
-
- private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
- private Coder<K> keyCoder;
-
- // on recovery, these will no be properly set because we don't
- // know which watermark hold states there are in the Flink State Backend
- private final Map<String, Instant> watermarkHolds = new HashMap<>();
-
- public FlinkStateInternals(KeyedStateBackend<ByteBuffer> flinkStateBackend, Coder<K> keyCoder) {
- this.flinkStateBackend = flinkStateBackend;
- this.keyCoder = keyCoder;
- }
-
- /**
- * Returns the minimum over all watermark holds.
- */
- public Instant watermarkHold() {
- long min = Long.MAX_VALUE;
- for (Instant hold: watermarkHolds.values()) {
- min = Math.min(min, hold.getMillis());
- }
- return new Instant(min);
- }
-
- @Override
- public K getKey() {
- ByteBuffer keyBytes = flinkStateBackend.getCurrentKey();
- try {
- return CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array());
- } catch (CoderException e) {
- throw new RuntimeException("Error decoding key.", e);
- }
- }
-
- @Override
- public <T extends State> T state(
- final StateNamespace namespace,
- StateTag<? super K, T> address) {
-
- return state(namespace, address, StateContexts.nullContext());
- }
-
- @Override
- public <T extends State> T state(
- final StateNamespace namespace,
- StateTag<? super K, T> address,
- final StateContext<?> context) {
-
- return address.bind(new StateTag.StateBinder<K>() {
-
- @Override
- public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address,
- Coder<T> coder) {
-
- return new FlinkValueState<>(flinkStateBackend, address, namespace, coder);
- }
-
- @Override
- public <T> BagState<T> bindBag(
- StateTag<? super K, BagState<T>> address,
- Coder<T> elemCoder) {
-
- return new FlinkBagState<>(flinkStateBackend, address, namespace, elemCoder);
- }
-
- @Override
- public <T> SetState<T> bindSet(
- StateTag<? super K, SetState<T>> address,
- Coder<T> elemCoder) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", SetState.class.getSimpleName()));
- }
-
- @Override
- public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> spec,
- Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", MapState.class.getSimpleName()));
- }
-
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT>
- bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
-
- return new FlinkCombiningState<>(
- flinkStateBackend, address, combineFn, namespace, accumCoder);
- }
-
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return new FlinkKeyedCombiningState<>(
- flinkStateBackend,
- address,
- combineFn,
- namespace,
- accumCoder,
- FlinkStateInternals.this);
- }
-
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- CombineWithContext.KeyedCombineFnWithContext<
- ? super K, InputT, AccumT, OutputT> combineFn) {
- return new FlinkCombiningStateWithContext<>(
- flinkStateBackend,
- address,
- combineFn,
- namespace,
- accumCoder,
- FlinkStateInternals.this,
- CombineContextFactory.createFromStateContext(context));
- }
-
- @Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
-
- return new FlinkWatermarkHoldState<>(
- flinkStateBackend, FlinkStateInternals.this, address, namespace, outputTimeFn);
- }
- });
- }
-
- private static class FlinkValueState<K, T> implements ValueState<T> {
-
- private final StateNamespace namespace;
- private final StateTag<? super K, ValueState<T>> address;
- private final ValueStateDescriptor<T> flinkStateDescriptor;
- private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
-
- FlinkValueState(
- KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<? super K, ValueState<T>> address,
- StateNamespace namespace,
- Coder<T> coder) {
-
- this.namespace = namespace;
- this.address = address;
- this.flinkStateBackend = flinkStateBackend;
-
- CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(coder);
-
- flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
- }
-
- @Override
- public void write(T input) {
- try {
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor).update(input);
- } catch (Exception e) {
- throw new RuntimeException("Error updating state.", e);
- }
- }
-
- @Override
- public ValueState<T> readLater() {
- return this;
- }
-
- @Override
- public T read() {
- try {
- return flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor).value();
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
- }
-
- @Override
- public void clear() {
- try {
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor).clear();
- } catch (Exception e) {
- throw new RuntimeException("Error clearing state.", e);
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- FlinkValueState<?, ?> that = (FlinkValueState<?, ?>) o;
-
- return namespace.equals(that.namespace) && address.equals(that.address);
-
- }
-
- @Override
- public int hashCode() {
- int result = namespace.hashCode();
- result = 31 * result + address.hashCode();
- return result;
- }
- }
-
- private static class FlinkBagState<K, T> implements BagState<T> {
-
- private final StateNamespace namespace;
- private final StateTag<? super K, BagState<T>> address;
- private final ListStateDescriptor<T> flinkStateDescriptor;
- private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
-
- FlinkBagState(
- KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<? super K, BagState<T>> address,
- StateNamespace namespace,
- Coder<T> coder) {
-
- this.namespace = namespace;
- this.address = address;
- this.flinkStateBackend = flinkStateBackend;
-
- CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(coder);
-
- flinkStateDescriptor = new ListStateDescriptor<>(address.getId(), typeInfo);
- }
-
- @Override
- public void add(T input) {
- try {
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor).add(input);
- } catch (Exception e) {
- throw new RuntimeException("Error adding to bag state.", e);
- }
- }
-
- @Override
- public BagState<T> readLater() {
- return this;
- }
-
- @Override
- public Iterable<T> read() {
- try {
- Iterable<T> result = flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor).get();
-
- return result != null ? result : Collections.<T>emptyList();
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public Boolean read() {
- try {
- Iterable<T> result = flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor).get();
- return result == null;
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
-
- }
-
- @Override
- public ReadableState<Boolean> readLater() {
- return this;
- }
- };
- }
-
- @Override
- public void clear() {
- try {
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor).clear();
- } catch (Exception e) {
- throw new RuntimeException("Error clearing state.", e);
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- FlinkBagState<?, ?> that = (FlinkBagState<?, ?>) o;
-
- return namespace.equals(that.namespace) && address.equals(that.address);
-
- }
-
- @Override
- public int hashCode() {
- int result = namespace.hashCode();
- result = 31 * result + address.hashCode();
- return result;
- }
- }
-
- private static class FlinkCombiningState<K, InputT, AccumT, OutputT>
- implements CombiningState<InputT, AccumT, OutputT> {
-
- private final StateNamespace namespace;
- private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
- private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
- private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
- private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
-
- FlinkCombiningState(
- KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
- StateNamespace namespace,
- Coder<AccumT> accumCoder) {
-
- this.namespace = namespace;
- this.address = address;
- this.combineFn = combineFn;
- this.flinkStateBackend = flinkStateBackend;
-
- CoderTypeInformation<AccumT> typeInfo = new CoderTypeInformation<>(accumCoder);
-
- flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
- }
-
- @Override
- public CombiningState<InputT, AccumT, OutputT> readLater() {
- return this;
- }
-
- @Override
- public void add(InputT value) {
- try {
- org.apache.flink.api.common.state.ValueState<AccumT> state =
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor);
-
- AccumT current = state.value();
- if (current == null) {
- current = combineFn.createAccumulator();
- }
- current = combineFn.addInput(current, value);
- state.update(current);
- } catch (Exception e) {
- throw new RuntimeException("Error adding to state." , e);
- }
- }
-
- @Override
- public void addAccum(AccumT accum) {
- try {
- org.apache.flink.api.common.state.ValueState<AccumT> state =
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor);
-
- AccumT current = state.value();
- if (current == null) {
- state.update(accum);
- } else {
- current = combineFn.mergeAccumulators(Lists.newArrayList(current, accum));
- state.update(current);
- }
- } catch (Exception e) {
- throw new RuntimeException("Error adding to state.", e);
- }
- }
-
- @Override
- public AccumT getAccum() {
- try {
- return flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor).value();
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
- }
-
- @Override
- public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(accumulators);
- }
-
- @Override
- public OutputT read() {
- try {
- org.apache.flink.api.common.state.ValueState<AccumT> state =
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor);
-
- AccumT accum = state.value();
- if (accum != null) {
- return combineFn.extractOutput(accum);
- } else {
- return combineFn.extractOutput(combineFn.createAccumulator());
- }
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public Boolean read() {
- try {
- return flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor).value() == null;
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
-
- }
-
- @Override
- public ReadableState<Boolean> readLater() {
- return this;
- }
- };
- }
-
- @Override
- public void clear() {
- try {
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor).clear();
- } catch (Exception e) {
- throw new RuntimeException("Error clearing state.", e);
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- FlinkCombiningState<?, ?, ?, ?> that =
- (FlinkCombiningState<?, ?, ?, ?>) o;
-
- return namespace.equals(that.namespace) && address.equals(that.address);
-
- }
-
- @Override
- public int hashCode() {
- int result = namespace.hashCode();
- result = 31 * result + address.hashCode();
- return result;
- }
- }
-
- private static class FlinkKeyedCombiningState<K, InputT, AccumT, OutputT>
- implements CombiningState<InputT, AccumT, OutputT> {
-
- private final StateNamespace namespace;
- private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
- private final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
- private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
- private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
- private final FlinkStateInternals<K> flinkStateInternals;
-
- FlinkKeyedCombiningState(
- KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn,
- StateNamespace namespace,
- Coder<AccumT> accumCoder,
- FlinkStateInternals<K> flinkStateInternals) {
-
- this.namespace = namespace;
- this.address = address;
- this.combineFn = combineFn;
- this.flinkStateBackend = flinkStateBackend;
- this.flinkStateInternals = flinkStateInternals;
-
- CoderTypeInformation<AccumT> typeInfo = new CoderTypeInformation<>(accumCoder);
-
- flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
- }
-
- @Override
- public CombiningState<InputT, AccumT, OutputT> readLater() {
- return this;
- }
-
- @Override
- public void add(InputT value) {
- try {
- org.apache.flink.api.common.state.ValueState<AccumT> state =
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor);
-
- AccumT current = state.value();
- if (current == null) {
- current = combineFn.createAccumulator(flinkStateInternals.getKey());
- }
- current = combineFn.addInput(flinkStateInternals.getKey(), current, value);
- state.update(current);
- } catch (Exception e) {
- throw new RuntimeException("Error adding to state." , e);
- }
- }
-
- @Override
- public void addAccum(AccumT accum) {
- try {
- org.apache.flink.api.common.state.ValueState<AccumT> state =
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor);
-
- AccumT current = state.value();
- if (current == null) {
- state.update(accum);
- } else {
- current = combineFn.mergeAccumulators(
- flinkStateInternals.getKey(),
- Lists.newArrayList(current, accum));
- state.update(current);
- }
- } catch (Exception e) {
- throw new RuntimeException("Error adding to state.", e);
- }
- }
-
- @Override
- public AccumT getAccum() {
- try {
- return flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor).value();
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
- }
-
- @Override
- public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(flinkStateInternals.getKey(), accumulators);
- }
-
- @Override
- public OutputT read() {
- try {
- org.apache.flink.api.common.state.ValueState<AccumT> state =
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor);
-
- AccumT accum = state.value();
- if (accum != null) {
- return combineFn.extractOutput(flinkStateInternals.getKey(), accum);
- } else {
- return combineFn.extractOutput(
- flinkStateInternals.getKey(),
- combineFn.createAccumulator(flinkStateInternals.getKey()));
- }
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public Boolean read() {
- try {
- return flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor).value() == null;
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
-
- }
-
- @Override
- public ReadableState<Boolean> readLater() {
- return this;
- }
- };
- }
-
- @Override
- public void clear() {
- try {
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor).clear();
- } catch (Exception e) {
- throw new RuntimeException("Error clearing state.", e);
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- FlinkKeyedCombiningState<?, ?, ?, ?> that =
- (FlinkKeyedCombiningState<?, ?, ?, ?>) o;
-
- return namespace.equals(that.namespace) && address.equals(that.address);
-
- }
-
- @Override
- public int hashCode() {
- int result = namespace.hashCode();
- result = 31 * result + address.hashCode();
- return result;
- }
- }
-
- private static class FlinkCombiningStateWithContext<K, InputT, AccumT, OutputT>
- implements CombiningState<InputT, AccumT, OutputT> {
-
- private final StateNamespace namespace;
- private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
- private final CombineWithContext.KeyedCombineFnWithContext<
- ? super K, InputT, AccumT, OutputT> combineFn;
- private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
- private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
- private final FlinkStateInternals<K> flinkStateInternals;
- private final CombineWithContext.Context context;
-
- FlinkCombiningStateWithContext(
- KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- CombineWithContext.KeyedCombineFnWithContext<
- ? super K, InputT, AccumT, OutputT> combineFn,
- StateNamespace namespace,
- Coder<AccumT> accumCoder,
- FlinkStateInternals<K> flinkStateInternals,
- CombineWithContext.Context context) {
-
- this.namespace = namespace;
- this.address = address;
- this.combineFn = combineFn;
- this.flinkStateBackend = flinkStateBackend;
- this.flinkStateInternals = flinkStateInternals;
- this.context = context;
-
- CoderTypeInformation<AccumT> typeInfo = new CoderTypeInformation<>(accumCoder);
-
- flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
- }
-
- @Override
- public CombiningState<InputT, AccumT, OutputT> readLater() {
- return this;
- }
-
- @Override
- public void add(InputT value) {
- try {
- org.apache.flink.api.common.state.ValueState<AccumT> state =
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor);
-
- AccumT current = state.value();
- if (current == null) {
- current = combineFn.createAccumulator(flinkStateInternals.getKey(), context);
- }
- current = combineFn.addInput(flinkStateInternals.getKey(), current, value, context);
- state.update(current);
- } catch (Exception e) {
- throw new RuntimeException("Error adding to state." , e);
- }
- }
-
- @Override
- public void addAccum(AccumT accum) {
- try {
- org.apache.flink.api.common.state.ValueState<AccumT> state =
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor);
-
- AccumT current = state.value();
- if (current == null) {
- state.update(accum);
- } else {
- current = combineFn.mergeAccumulators(
- flinkStateInternals.getKey(),
- Lists.newArrayList(current, accum),
- context);
- state.update(current);
- }
- } catch (Exception e) {
- throw new RuntimeException("Error adding to state.", e);
- }
- }
-
- @Override
- public AccumT getAccum() {
- try {
- return flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor).value();
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
- }
-
- @Override
- public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(flinkStateInternals.getKey(), accumulators, context);
- }
-
- @Override
- public OutputT read() {
- try {
- org.apache.flink.api.common.state.ValueState<AccumT> state =
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor);
-
- AccumT accum = state.value();
- return combineFn.extractOutput(flinkStateInternals.getKey(), accum, context);
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public Boolean read() {
- try {
- return flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor).value() == null;
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
-
- }
-
- @Override
- public ReadableState<Boolean> readLater() {
- return this;
- }
- };
- }
-
- @Override
- public void clear() {
- try {
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor).clear();
- } catch (Exception e) {
- throw new RuntimeException("Error clearing state.", e);
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- FlinkCombiningStateWithContext<?, ?, ?, ?> that =
- (FlinkCombiningStateWithContext<?, ?, ?, ?>) o;
-
- return namespace.equals(that.namespace) && address.equals(that.address);
-
- }
-
- @Override
- public int hashCode() {
- int result = namespace.hashCode();
- result = 31 * result + address.hashCode();
- return result;
- }
- }
-
- private static class FlinkWatermarkHoldState<K, W extends BoundedWindow>
- implements WatermarkHoldState<W> {
- private final StateTag<? super K, WatermarkHoldState<W>> address;
- private final OutputTimeFn<? super W> outputTimeFn;
- private final StateNamespace namespace;
- private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
- private final FlinkStateInternals<K> flinkStateInternals;
- private final ValueStateDescriptor<Instant> flinkStateDescriptor;
-
- public FlinkWatermarkHoldState(
- KeyedStateBackend<ByteBuffer> flinkStateBackend,
- FlinkStateInternals<K> flinkStateInternals,
- StateTag<? super K, WatermarkHoldState<W>> address,
- StateNamespace namespace,
- OutputTimeFn<? super W> outputTimeFn) {
- this.address = address;
- this.outputTimeFn = outputTimeFn;
- this.namespace = namespace;
- this.flinkStateBackend = flinkStateBackend;
- this.flinkStateInternals = flinkStateInternals;
-
- CoderTypeInformation<Instant> typeInfo = new CoderTypeInformation<>(InstantCoder.of());
- flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
- }
-
- @Override
- public OutputTimeFn<? super W> getOutputTimeFn() {
- return outputTimeFn;
- }
-
- @Override
- public WatermarkHoldState<W> readLater() {
- return this;
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public Boolean read() {
- try {
- return flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor).value() == null;
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
- }
-
- @Override
- public ReadableState<Boolean> readLater() {
- return this;
- }
- };
-
- }
-
- @Override
- public void add(Instant value) {
- try {
- org.apache.flink.api.common.state.ValueState<Instant> state =
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor);
-
- Instant current = state.value();
- if (current == null) {
- state.update(value);
- flinkStateInternals.watermarkHolds.put(namespace.stringKey(), value);
- } else {
- Instant combined = outputTimeFn.combine(current, value);
- state.update(combined);
- flinkStateInternals.watermarkHolds.put(namespace.stringKey(), combined);
- }
- } catch (Exception e) {
- throw new RuntimeException("Error updating state.", e);
- }
- }
-
- @Override
- public Instant read() {
- try {
- org.apache.flink.api.common.state.ValueState<Instant> state =
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor);
- return state.value();
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
- }
-
- @Override
- public void clear() {
- flinkStateInternals.watermarkHolds.remove(namespace.stringKey());
- try {
- org.apache.flink.api.common.state.ValueState<Instant> state =
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor);
- state.clear();
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- FlinkWatermarkHoldState<?, ?> that = (FlinkWatermarkHoldState<?, ?>) o;
-
- if (!address.equals(that.address)) {
- return false;
- }
- if (!outputTimeFn.equals(that.outputTimeFn)) {
- return false;
- }
- return namespace.equals(that.namespace);
-
- }
-
- @Override
- public int hashCode() {
- int result = address.hashCode();
- result = 31 * result + outputTimeFn.hashCode();
- result = 31 * result + namespace.hashCode();
- return result;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java
deleted file mode 100644
index b38a520..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import java.io.DataOutputStream;
-
-/**
- * This interface is used to checkpoint key-groups state.
- */
-public interface KeyGroupCheckpointedOperator extends KeyGroupRestoringOperator{
- /**
- * Snapshots the state for a given {@code keyGroupIdx}.
- *
- * <p>AbstractStreamOperator would call this hook in
- * AbstractStreamOperator.snapshotState() while iterating over the key groups.
- * @param keyGroupIndex the id of the key-group to be put in the snapshot.
- * @param out the stream to write to.
- */
- void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java
deleted file mode 100644
index 2bdfc6e..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import java.io.DataInputStream;
-
-/**
- * This interface is used to restore key-groups state.
- */
-public interface KeyGroupRestoringOperator {
- /**
- * Restore the state for a given {@code keyGroupIndex}.
- * @param keyGroupIndex the id of the key-group to be put in the snapshot.
- * @param in the stream to read from.
- */
- void restoreKeyGroupState(int keyGroupIndex, DataInputStream in) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java
deleted file mode 100644
index 0004e9e..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal state implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/resources/log4j.properties b/runners/flink/runner/src/main/resources/log4j.properties
deleted file mode 100644
index 4b6a708..0000000
--- a/runners/flink/runner/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF,console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java
deleted file mode 100644
index 10d6d9d..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.runners.flink.translation.types.EncodedValueComparator;
-import org.apache.beam.runners.flink.translation.types.EncodedValueTypeInformation;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.junit.Assert;
-
-/**
- * Test for {@link EncodedValueComparator}.
- */
-public class EncodedValueComparatorTest extends ComparatorTestBase<byte[]> {
-
- @Override
- protected TypeComparator<byte[]> createComparator(boolean ascending) {
- return new EncodedValueTypeInformation().createComparator(ascending, new ExecutionConfig());
- }
-
- @Override
- protected TypeSerializer<byte[]> createSerializer() {
- return new EncodedValueTypeInformation().createSerializer(new ExecutionConfig());
- }
-
- @Override
- protected void deepEquals(String message, byte[] should, byte[] is) {
- Assert.assertArrayEquals(message, should, is);
- }
-
- @Override
- protected byte[][] getSortedTestData() {
- StringUtf8Coder coder = StringUtf8Coder.of();
-
- try {
- return new byte[][]{
- CoderUtils.encodeToByteArray(coder, ""),
- CoderUtils.encodeToByteArray(coder, "Lorem Ipsum Dolor Omit Longer"),
- CoderUtils.encodeToByteArray(coder, "aaaa"),
- CoderUtils.encodeToByteArray(coder, "abcd"),
- CoderUtils.encodeToByteArray(coder, "abce"),
- CoderUtils.encodeToByteArray(coder, "abdd"),
- CoderUtils.encodeToByteArray(coder, "accd"),
- CoderUtils.encodeToByteArray(coder, "bbcd")
- };
- } catch (CoderException e) {
- throw new RuntimeException("Could not encode values.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
deleted file mode 100644
index d9d174c..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.flink;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.junit.Test;
-
-/**
- * Tests the proper registration of the Flink runner.
- */
-public class FlinkRunnerRegistrarTest {
-
- @Test
- public void testFullName() {
- String[] args =
- new String[] {String.format("--runner=%s", FlinkRunner.class.getName())};
- PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
- assertEquals(opts.getRunner(), FlinkRunner.class);
- }
-
- @Test
- public void testClassName() {
- String[] args =
- new String[] {String.format("--runner=%s", FlinkRunner.class.getSimpleName())};
- PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
- assertEquals(opts.getRunner(), FlinkRunner.class);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
deleted file mode 100644
index d6240c4..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.PipelineRunner;
-
-/**
- * {@link org.apache.beam.sdk.Pipeline} for testing Dataflow programs on the
- * {@link FlinkRunner}.
- */
-public class FlinkTestPipeline extends Pipeline {
-
- /**
- * Creates and returns a new test pipeline for batch execution.
- *
- * <p>Use {@link org.apache.beam.sdk.testing.PAssert} to add tests, then call
- * {@link Pipeline#run} to execute the pipeline and check the tests.
- */
- public static FlinkTestPipeline createForBatch() {
- return create(false);
- }
-
- /**
- * Creates and returns a new test pipeline for streaming execution.
- *
- * <p>Use {@link org.apache.beam.sdk.testing.PAssert} to add tests, then call
- * {@link Pipeline#run} to execute the pipeline and check the tests.
- *
- * @return The Test Pipeline
- */
- public static FlinkTestPipeline createForStreaming() {
- return create(true);
- }
-
- /**
- * Creates and returns a new test pipeline for streaming or batch execution.
- *
- * <p>Use {@link org.apache.beam.sdk.testing.PAssert} to add tests, then call
- * {@link Pipeline#run} to execute the pipeline and check the tests.
- *
- * @param streaming <code>True</code> for streaming mode, <code>False</code> for batch.
- * @return The Test Pipeline.
- */
- private static FlinkTestPipeline create(boolean streaming) {
- TestFlinkRunner flinkRunner = TestFlinkRunner.create(streaming);
- return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions());
- }
-
- private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner,
- PipelineOptions options) {
- super(runner, options);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
deleted file mode 100644
index 06187f6..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Collections;
-import java.util.HashMap;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.joda.time.Instant;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- * Tests for serialization and deserialization of {@link PipelineOptions} in {@link DoFnOperator}.
- */
-public class PipelineOptionsTest {
-
- /**
- * Pipeline options.
- */
- public interface MyOptions extends FlinkPipelineOptions {
- @Description("Bla bla bla")
- @Default.String("Hello")
- String getTestOption();
- void setTestOption(String value);
- }
-
- private static MyOptions options;
- private static SerializedPipelineOptions serializedOptions;
-
- private static final String[] args = new String[]{"--testOption=nothing"};
-
- @BeforeClass
- public static void beforeTest() {
- options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class);
- serializedOptions = new SerializedPipelineOptions(options);
- }
-
- @Test
- public void testDeserialization() {
- MyOptions deserializedOptions = serializedOptions.getPipelineOptions().as(MyOptions.class);
- assertEquals("nothing", deserializedOptions.getTestOption());
- }
-
- @Test
- public void testIgnoredFieldSerialization() {
- FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
- options.setStateBackend(new MemoryStateBackend());
-
- FlinkPipelineOptions deserialized =
- new SerializedPipelineOptions(options).getPipelineOptions().as(FlinkPipelineOptions.class);
-
- assertNull(deserialized.getStateBackend());
- }
-
- @Test
- public void testCaching() {
- PipelineOptions deserializedOptions =
- serializedOptions.getPipelineOptions().as(PipelineOptions.class);
-
- assertNotNull(deserializedOptions);
- assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
- assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
- assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
- }
-
- @Test(expected = Exception.class)
- public void testNonNull() {
- new SerializedPipelineOptions(null);
- }
-
- @Test(expected = Exception.class)
- public void parDoBaseClassPipelineOptionsNullTest() {
- DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
- new TestDoFn(),
- WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
- new TupleTag<String>("main-output"),
- Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<String>(),
- WindowingStrategy.globalDefault(),
- new HashMap<Integer, PCollectionView<?>>(),
- Collections.<PCollectionView<?>>emptyList(),
- null,
- null);
-
- }
-
- /**
- * Tests that PipelineOptions are present after serialization.
- */
- @Test
- public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception {
-
- DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
- new TestDoFn(),
- WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
- new TupleTag<String>("main-output"),
- Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<String>(),
- WindowingStrategy.globalDefault(),
- new HashMap<Integer, PCollectionView<?>>(),
- Collections.<PCollectionView<?>>emptyList(),
- options,
- null);
-
- final byte[] serialized = SerializationUtils.serialize(doFnOperator);
-
- @SuppressWarnings("unchecked")
- DoFnOperator<Object, Object, Object> deserialized =
- (DoFnOperator<Object, Object, Object>) SerializationUtils.deserialize(serialized);
-
- TypeInformation<WindowedValue<Object>> typeInformation = TypeInformation.of(
- new TypeHint<WindowedValue<Object>>() {});
-
- OneInputStreamOperatorTestHarness<WindowedValue<Object>, Object> testHarness =
- new OneInputStreamOperatorTestHarness<>(deserialized,
- typeInformation.createSerializer(new ExecutionConfig()));
-
- testHarness.open();
-
- // execute once to access options
- testHarness.processElement(new StreamRecord<>(
- WindowedValue.of(
- new Object(),
- Instant.now(),
- GlobalWindow.INSTANCE,
- PaneInfo.NO_FIRING)));
-
- testHarness.close();
-
- }
-
-
- private static class TestDoFn extends DoFn<String, String> {
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- Assert.assertNotNull(c.getPipelineOptions());
- Assert.assertEquals(
- options.getTestOption(),
- c.getPipelineOptions().as(MyOptions.class).getTestOption());
- }
- }
-}