You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:15:00 UTC
[26/50] [abbrv] beam git commit: [BEAM-1994] Remove Flink examples
package
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
new file mode 100644
index 0000000..2bf0bf1
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
+
+import com.google.common.collect.Iterators;
+import java.util.Collections;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.MapState;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.SetState;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateContext;
+import org.apache.beam.sdk.util.state.StateContexts;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+
+/**
+ * {@link StateInternals} that uses a Flink {@link OperatorStateBackend}
+ * to manage the split-distribute state.
+ *
+ * <p>Elements in ListState will be redistributed in round robin fashion
+ * to operators when restarting with a different parallelism.
+ *
+ * <p>Note:
+ * Ignore index of key and namespace.
+ * Just implement BagState.
+ */
+public class FlinkSplitStateInternals<K> implements StateInternals<K> {
+
+ private final OperatorStateBackend stateBackend;
+
+ public FlinkSplitStateInternals(OperatorStateBackend stateBackend) {
+ this.stateBackend = stateBackend;
+ }
+
+ @Override
+ public K getKey() {
+ return null;
+ }
+
+ @Override
+ public <T extends State> T state(
+ final StateNamespace namespace,
+ StateTag<? super K, T> address) {
+
+ return state(namespace, address, StateContexts.nullContext());
+ }
+
+ @Override
+ public <T extends State> T state(
+ final StateNamespace namespace,
+ StateTag<? super K, T> address,
+ final StateContext<?> context) {
+
+ return address.bind(new StateTag.StateBinder<K>() {
+
+ @Override
+ public <T> ValueState<T> bindValue(
+ StateTag<? super K, ValueState<T>> address,
+ Coder<T> coder) {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported", ValueState.class.getSimpleName()));
+ }
+
+ @Override
+ public <T> BagState<T> bindBag(
+ StateTag<? super K, BagState<T>> address,
+ Coder<T> elemCoder) {
+
+ return new FlinkSplitBagState<>(stateBackend, address, namespace, elemCoder);
+ }
+
+ @Override
+ public <T> SetState<T> bindSet(
+ StateTag<? super K, SetState<T>> address,
+ Coder<T> elemCoder) {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported", SetState.class.getSimpleName()));
+ }
+
+ @Override
+ public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
+ StateTag<? super K, MapState<KeyT, ValueT>> spec,
+ Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported", MapState.class.getSimpleName()));
+ }
+
+ @Override
+ public <InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT>
+ bindCombiningValue(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ Coder<AccumT> accumCoder,
+ Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+ throw new UnsupportedOperationException("bindCombiningValue is not supported.");
+ }
+
+ @Override
+ public <InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ Coder<AccumT> accumCoder,
+ final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
+ throw new UnsupportedOperationException("bindKeyedCombiningValue is not supported.");
+
+ }
+
+ @Override
+ public <InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ Coder<AccumT> accumCoder,
+ CombineWithContext.KeyedCombineFnWithContext<
+ ? super K, InputT, AccumT, OutputT> combineFn) {
+ throw new UnsupportedOperationException(
+ "bindKeyedCombiningValueWithContext is not supported.");
+ }
+
+ @Override
+ public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+ StateTag<? super K, WatermarkHoldState<W>> address,
+ OutputTimeFn<? super W> outputTimeFn) {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported", CombiningState.class.getSimpleName()));
+ }
+ });
+ }
+
+ private static class FlinkSplitBagState<K, T> implements BagState<T> {
+
+ private final ListStateDescriptor<T> descriptor;
+ private OperatorStateBackend flinkStateBackend;
+ private final StateNamespace namespace;
+ private final StateTag<? super K, BagState<T>> address;
+
+ FlinkSplitBagState(
+ OperatorStateBackend flinkStateBackend,
+ StateTag<? super K, BagState<T>> address,
+ StateNamespace namespace,
+ Coder<T> coder) {
+ this.flinkStateBackend = flinkStateBackend;
+ this.namespace = namespace;
+ this.address = address;
+
+ CoderTypeInformation<T> typeInfo =
+ new CoderTypeInformation<>(coder);
+
+ descriptor = new ListStateDescriptor<>(address.getId(),
+ typeInfo.createSerializer(new ExecutionConfig()));
+ }
+
+ @Override
+ public void add(T input) {
+ try {
+ flinkStateBackend.getOperatorState(descriptor).add(input);
+ } catch (Exception e) {
+ throw new RuntimeException("Error updating state.", e);
+ }
+ }
+
+ @Override
+ public BagState<T> readLater() {
+ return this;
+ }
+
+ @Override
+ public Iterable<T> read() {
+ try {
+ Iterable<T> result = flinkStateBackend.getOperatorState(descriptor).get();
+ return result != null ? result : Collections.<T>emptyList();
+ } catch (Exception e) {
+ throw new RuntimeException("Error updating state.", e);
+ }
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return new ReadableState<Boolean>() {
+ @Override
+ public Boolean read() {
+ try {
+ Iterable<T> result = flinkStateBackend.getOperatorState(descriptor).get();
+ // PartitionableListState.get() return empty collection When there is no element,
+ // KeyedListState different. (return null)
+ return result == null || Iterators.size(result.iterator()) == 0;
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading state.", e);
+ }
+
+ }
+
+ @Override
+ public ReadableState<Boolean> readLater() {
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public void clear() {
+ try {
+ flinkStateBackend.getOperatorState(descriptor).clear();
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading state.", e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ FlinkSplitBagState<?, ?> that = (FlinkSplitBagState<?, ?>) o;
+
+ return namespace.equals(that.namespace) && address.equals(that.address);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = namespace.hashCode();
+ result = 31 * result + address.hashCode();
+ return result;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
new file mode 100644
index 0000000..4f961e5
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -0,0 +1,1053 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
+
+import com.google.common.collect.Lists;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.CombineContextFactory;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.MapState;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.SetState;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateContext;
+import org.apache.beam.sdk.util.state.StateContexts;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.joda.time.Instant;
+
+/**
+ * {@link StateInternals} that uses a Flink {@link KeyedStateBackend} to manage state.
+ *
+ * <p>Note: In the Flink streaming runner the key is always encoded
+ * using an {@link Coder} and stored in a {@link ByteBuffer}.
+ */
+public class FlinkStateInternals<K> implements StateInternals<K> {
+
+ private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
+ private Coder<K> keyCoder;
+
+ // on recovery, these will no be properly set because we don't
+ // know which watermark hold states there are in the Flink State Backend
+ private final Map<String, Instant> watermarkHolds = new HashMap<>();
+
+ public FlinkStateInternals(KeyedStateBackend<ByteBuffer> flinkStateBackend, Coder<K> keyCoder) {
+ this.flinkStateBackend = flinkStateBackend;
+ this.keyCoder = keyCoder;
+ }
+
+ /**
+ * Returns the minimum over all watermark holds.
+ */
+ public Instant watermarkHold() {
+ long min = Long.MAX_VALUE;
+ for (Instant hold: watermarkHolds.values()) {
+ min = Math.min(min, hold.getMillis());
+ }
+ return new Instant(min);
+ }
+
+ @Override
+ public K getKey() {
+ ByteBuffer keyBytes = flinkStateBackend.getCurrentKey();
+ try {
+ return CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array());
+ } catch (CoderException e) {
+ throw new RuntimeException("Error decoding key.", e);
+ }
+ }
+
+ @Override
+ public <T extends State> T state(
+ final StateNamespace namespace,
+ StateTag<? super K, T> address) {
+
+ return state(namespace, address, StateContexts.nullContext());
+ }
+
+ @Override
+ public <T extends State> T state(
+ final StateNamespace namespace,
+ StateTag<? super K, T> address,
+ final StateContext<?> context) {
+
+ return address.bind(new StateTag.StateBinder<K>() {
+
+ @Override
+ public <T> ValueState<T> bindValue(
+ StateTag<? super K, ValueState<T>> address,
+ Coder<T> coder) {
+
+ return new FlinkValueState<>(flinkStateBackend, address, namespace, coder);
+ }
+
+ @Override
+ public <T> BagState<T> bindBag(
+ StateTag<? super K, BagState<T>> address,
+ Coder<T> elemCoder) {
+
+ return new FlinkBagState<>(flinkStateBackend, address, namespace, elemCoder);
+ }
+
+ @Override
+ public <T> SetState<T> bindSet(
+ StateTag<? super K, SetState<T>> address,
+ Coder<T> elemCoder) {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported", SetState.class.getSimpleName()));
+ }
+
+ @Override
+ public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
+ StateTag<? super K, MapState<KeyT, ValueT>> spec,
+ Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported", MapState.class.getSimpleName()));
+ }
+
+ @Override
+ public <InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT>
+ bindCombiningValue(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ Coder<AccumT> accumCoder,
+ Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+
+ return new FlinkCombiningState<>(
+ flinkStateBackend, address, combineFn, namespace, accumCoder);
+ }
+
+ @Override
+ public <InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ Coder<AccumT> accumCoder,
+ final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
+ return new FlinkKeyedCombiningState<>(
+ flinkStateBackend,
+ address,
+ combineFn,
+ namespace,
+ accumCoder,
+ FlinkStateInternals.this);
+ }
+
+ @Override
+ public <InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ Coder<AccumT> accumCoder,
+ CombineWithContext.KeyedCombineFnWithContext<
+ ? super K, InputT, AccumT, OutputT> combineFn) {
+ return new FlinkCombiningStateWithContext<>(
+ flinkStateBackend,
+ address,
+ combineFn,
+ namespace,
+ accumCoder,
+ FlinkStateInternals.this,
+ CombineContextFactory.createFromStateContext(context));
+ }
+
+ @Override
+ public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+ StateTag<? super K, WatermarkHoldState<W>> address,
+ OutputTimeFn<? super W> outputTimeFn) {
+
+ return new FlinkWatermarkHoldState<>(
+ flinkStateBackend, FlinkStateInternals.this, address, namespace, outputTimeFn);
+ }
+ });
+ }
+
+ private static class FlinkValueState<K, T> implements ValueState<T> {
+
+ private final StateNamespace namespace;
+ private final StateTag<? super K, ValueState<T>> address;
+ private final ValueStateDescriptor<T> flinkStateDescriptor;
+ private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
+
+ FlinkValueState(
+ KeyedStateBackend<ByteBuffer> flinkStateBackend,
+ StateTag<? super K, ValueState<T>> address,
+ StateNamespace namespace,
+ Coder<T> coder) {
+
+ this.namespace = namespace;
+ this.address = address;
+ this.flinkStateBackend = flinkStateBackend;
+
+ CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(coder);
+
+ flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
+ }
+
+ @Override
+ public void write(T input) {
+ try {
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).update(input);
+ } catch (Exception e) {
+ throw new RuntimeException("Error updating state.", e);
+ }
+ }
+
+ @Override
+ public ValueState<T> readLater() {
+ return this;
+ }
+
+ @Override
+ public T read() {
+ try {
+ return flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).value();
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading state.", e);
+ }
+ }
+
+ @Override
+ public void clear() {
+ try {
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).clear();
+ } catch (Exception e) {
+ throw new RuntimeException("Error clearing state.", e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ FlinkValueState<?, ?> that = (FlinkValueState<?, ?>) o;
+
+ return namespace.equals(that.namespace) && address.equals(that.address);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = namespace.hashCode();
+ result = 31 * result + address.hashCode();
+ return result;
+ }
+ }
+
+ private static class FlinkBagState<K, T> implements BagState<T> {
+
+ private final StateNamespace namespace;
+ private final StateTag<? super K, BagState<T>> address;
+ private final ListStateDescriptor<T> flinkStateDescriptor;
+ private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
+
+ FlinkBagState(
+ KeyedStateBackend<ByteBuffer> flinkStateBackend,
+ StateTag<? super K, BagState<T>> address,
+ StateNamespace namespace,
+ Coder<T> coder) {
+
+ this.namespace = namespace;
+ this.address = address;
+ this.flinkStateBackend = flinkStateBackend;
+
+ CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(coder);
+
+ flinkStateDescriptor = new ListStateDescriptor<>(address.getId(), typeInfo);
+ }
+
+ @Override
+ public void add(T input) {
+ try {
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).add(input);
+ } catch (Exception e) {
+ throw new RuntimeException("Error adding to bag state.", e);
+ }
+ }
+
+ @Override
+ public BagState<T> readLater() {
+ return this;
+ }
+
+ @Override
+ public Iterable<T> read() {
+ try {
+ Iterable<T> result = flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).get();
+
+ return result != null ? result : Collections.<T>emptyList();
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading state.", e);
+ }
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return new ReadableState<Boolean>() {
+ @Override
+ public Boolean read() {
+ try {
+ Iterable<T> result = flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).get();
+ return result == null;
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading state.", e);
+ }
+
+ }
+
+ @Override
+ public ReadableState<Boolean> readLater() {
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public void clear() {
+ try {
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).clear();
+ } catch (Exception e) {
+ throw new RuntimeException("Error clearing state.", e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ FlinkBagState<?, ?> that = (FlinkBagState<?, ?>) o;
+
+ return namespace.equals(that.namespace) && address.equals(that.address);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = namespace.hashCode();
+ result = 31 * result + address.hashCode();
+ return result;
+ }
+ }
+
+ private static class FlinkCombiningState<K, InputT, AccumT, OutputT>
+ implements CombiningState<InputT, AccumT, OutputT> {
+
+ private final StateNamespace namespace;
+ private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+ private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
+ private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
+ private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
+
+ FlinkCombiningState(
+ KeyedStateBackend<ByteBuffer> flinkStateBackend,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
+ StateNamespace namespace,
+ Coder<AccumT> accumCoder) {
+
+ this.namespace = namespace;
+ this.address = address;
+ this.combineFn = combineFn;
+ this.flinkStateBackend = flinkStateBackend;
+
+ CoderTypeInformation<AccumT> typeInfo = new CoderTypeInformation<>(accumCoder);
+
+ flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
+ }
+
+ @Override
+ public CombiningState<InputT, AccumT, OutputT> readLater() {
+ return this;
+ }
+
+ @Override
+ public void add(InputT value) {
+ try {
+ org.apache.flink.api.common.state.ValueState<AccumT> state =
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor);
+
+ AccumT current = state.value();
+ if (current == null) {
+ current = combineFn.createAccumulator();
+ }
+ current = combineFn.addInput(current, value);
+ state.update(current);
+ } catch (Exception e) {
+ throw new RuntimeException("Error adding to state." , e);
+ }
+ }
+
+ @Override
+ public void addAccum(AccumT accum) {
+ try {
+ org.apache.flink.api.common.state.ValueState<AccumT> state =
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor);
+
+ AccumT current = state.value();
+ if (current == null) {
+ state.update(accum);
+ } else {
+ current = combineFn.mergeAccumulators(Lists.newArrayList(current, accum));
+ state.update(current);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Error adding to state.", e);
+ }
+ }
+
+ @Override
+ public AccumT getAccum() {
+ try {
+ return flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).value();
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading state.", e);
+ }
+ }
+
+ @Override
+ public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+ return combineFn.mergeAccumulators(accumulators);
+ }
+
+ @Override
+ public OutputT read() {
+ try {
+ org.apache.flink.api.common.state.ValueState<AccumT> state =
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor);
+
+ AccumT accum = state.value();
+ if (accum != null) {
+ return combineFn.extractOutput(accum);
+ } else {
+ return combineFn.extractOutput(combineFn.createAccumulator());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading state.", e);
+ }
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return new ReadableState<Boolean>() {
+ @Override
+ public Boolean read() {
+ try {
+ return flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).value() == null;
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading state.", e);
+ }
+
+ }
+
+ @Override
+ public ReadableState<Boolean> readLater() {
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public void clear() {
+ try {
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).clear();
+ } catch (Exception e) {
+ throw new RuntimeException("Error clearing state.", e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ FlinkCombiningState<?, ?, ?, ?> that =
+ (FlinkCombiningState<?, ?, ?, ?>) o;
+
+ return namespace.equals(that.namespace) && address.equals(that.address);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = namespace.hashCode();
+ result = 31 * result + address.hashCode();
+ return result;
+ }
+ }
+
+ private static class FlinkKeyedCombiningState<K, InputT, AccumT, OutputT>
+ implements CombiningState<InputT, AccumT, OutputT> {
+
+ private final StateNamespace namespace;
+ private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+ private final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
+ private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
+ private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
+ private final FlinkStateInternals<K> flinkStateInternals;
+
+ FlinkKeyedCombiningState(
+ KeyedStateBackend<ByteBuffer> flinkStateBackend,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn,
+ StateNamespace namespace,
+ Coder<AccumT> accumCoder,
+ FlinkStateInternals<K> flinkStateInternals) {
+
+ this.namespace = namespace;
+ this.address = address;
+ this.combineFn = combineFn;
+ this.flinkStateBackend = flinkStateBackend;
+ this.flinkStateInternals = flinkStateInternals;
+
+ CoderTypeInformation<AccumT> typeInfo = new CoderTypeInformation<>(accumCoder);
+
+ flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
+ }
+
+ @Override
+ public CombiningState<InputT, AccumT, OutputT> readLater() {
+ return this;
+ }
+
+ @Override
+ public void add(InputT value) {
+ try {
+ org.apache.flink.api.common.state.ValueState<AccumT> state =
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor);
+
+ AccumT current = state.value();
+ if (current == null) {
+ current = combineFn.createAccumulator(flinkStateInternals.getKey());
+ }
+ current = combineFn.addInput(flinkStateInternals.getKey(), current, value);
+ state.update(current);
+ } catch (Exception e) {
+ throw new RuntimeException("Error adding to state." , e);
+ }
+ }
+
+ @Override
+ public void addAccum(AccumT accum) {
+ try {
+ org.apache.flink.api.common.state.ValueState<AccumT> state =
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor);
+
+ AccumT current = state.value();
+ if (current == null) {
+ state.update(accum);
+ } else {
+ current = combineFn.mergeAccumulators(
+ flinkStateInternals.getKey(),
+ Lists.newArrayList(current, accum));
+ state.update(current);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Error adding to state.", e);
+ }
+ }
+
+ @Override
+ public AccumT getAccum() {
+ try {
+ return flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).value();
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading state.", e);
+ }
+ }
+
+ @Override
+ public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+ return combineFn.mergeAccumulators(flinkStateInternals.getKey(), accumulators);
+ }
+
+ @Override
+ public OutputT read() {
+ try {
+ org.apache.flink.api.common.state.ValueState<AccumT> state =
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor);
+
+ AccumT accum = state.value();
+ if (accum != null) {
+ return combineFn.extractOutput(flinkStateInternals.getKey(), accum);
+ } else {
+ return combineFn.extractOutput(
+ flinkStateInternals.getKey(),
+ combineFn.createAccumulator(flinkStateInternals.getKey()));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading state.", e);
+ }
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return new ReadableState<Boolean>() {
+ @Override
+ public Boolean read() {
+ try {
+ return flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).value() == null;
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading state.", e);
+ }
+
+ }
+
+ @Override
+ public ReadableState<Boolean> readLater() {
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public void clear() {
+ try {
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).clear();
+ } catch (Exception e) {
+ throw new RuntimeException("Error clearing state.", e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ FlinkKeyedCombiningState<?, ?, ?, ?> that =
+ (FlinkKeyedCombiningState<?, ?, ?, ?>) o;
+
+ return namespace.equals(that.namespace) && address.equals(that.address);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = namespace.hashCode();
+ result = 31 * result + address.hashCode();
+ return result;
+ }
+ }
+
+ private static class FlinkCombiningStateWithContext<K, InputT, AccumT, OutputT>
+ implements CombiningState<InputT, AccumT, OutputT> {
+
+ private final StateNamespace namespace;
+ private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+ private final CombineWithContext.KeyedCombineFnWithContext<
+ ? super K, InputT, AccumT, OutputT> combineFn;
+ private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
+ private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
+ private final FlinkStateInternals<K> flinkStateInternals;
+ private final CombineWithContext.Context context;
+
+ FlinkCombiningStateWithContext(
+ KeyedStateBackend<ByteBuffer> flinkStateBackend,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ CombineWithContext.KeyedCombineFnWithContext<
+ ? super K, InputT, AccumT, OutputT> combineFn,
+ StateNamespace namespace,
+ Coder<AccumT> accumCoder,
+ FlinkStateInternals<K> flinkStateInternals,
+ CombineWithContext.Context context) {
+
+ this.namespace = namespace;
+ this.address = address;
+ this.combineFn = combineFn;
+ this.flinkStateBackend = flinkStateBackend;
+ this.flinkStateInternals = flinkStateInternals;
+ this.context = context;
+
+ CoderTypeInformation<AccumT> typeInfo = new CoderTypeInformation<>(accumCoder);
+
+ flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
+ }
+
+ @Override
+ public CombiningState<InputT, AccumT, OutputT> readLater() {
+ return this;
+ }
+
+ @Override
+ public void add(InputT value) {
+ try {
+ org.apache.flink.api.common.state.ValueState<AccumT> state =
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor);
+
+ AccumT current = state.value();
+ if (current == null) {
+ current = combineFn.createAccumulator(flinkStateInternals.getKey(), context);
+ }
+ current = combineFn.addInput(flinkStateInternals.getKey(), current, value, context);
+ state.update(current);
+ } catch (Exception e) {
+ throw new RuntimeException("Error adding to state." , e);
+ }
+ }
+
+ @Override
+ public void addAccum(AccumT accum) {
+ try {
+ org.apache.flink.api.common.state.ValueState<AccumT> state =
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor);
+
+ AccumT current = state.value();
+ if (current == null) {
+ state.update(accum);
+ } else {
+ current = combineFn.mergeAccumulators(
+ flinkStateInternals.getKey(),
+ Lists.newArrayList(current, accum),
+ context);
+ state.update(current);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Error adding to state.", e);
+ }
+ }
+
+ @Override
+ public AccumT getAccum() {
+ try {
+ return flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).value();
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading state.", e);
+ }
+ }
+
+ @Override
+ public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+ return combineFn.mergeAccumulators(flinkStateInternals.getKey(), accumulators, context);
+ }
+
+ @Override
+ public OutputT read() {
+ try {
+ org.apache.flink.api.common.state.ValueState<AccumT> state =
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor);
+
+ AccumT accum = state.value();
+ return combineFn.extractOutput(flinkStateInternals.getKey(), accum, context);
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading state.", e);
+ }
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return new ReadableState<Boolean>() {
+ @Override
+ public Boolean read() {
+ try {
+ return flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).value() == null;
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading state.", e);
+ }
+
+ }
+
+ @Override
+ public ReadableState<Boolean> readLater() {
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public void clear() {
+ try {
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).clear();
+ } catch (Exception e) {
+ throw new RuntimeException("Error clearing state.", e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ FlinkCombiningStateWithContext<?, ?, ?, ?> that =
+ (FlinkCombiningStateWithContext<?, ?, ?, ?>) o;
+
+ return namespace.equals(that.namespace) && address.equals(that.address);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = namespace.hashCode();
+ result = 31 * result + address.hashCode();
+ return result;
+ }
+ }
+
+ private static class FlinkWatermarkHoldState<K, W extends BoundedWindow>
+ implements WatermarkHoldState<W> {
+ private final StateTag<? super K, WatermarkHoldState<W>> address;
+ private final OutputTimeFn<? super W> outputTimeFn;
+ private final StateNamespace namespace;
+ private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
+ private final FlinkStateInternals<K> flinkStateInternals;
+ private final ValueStateDescriptor<Instant> flinkStateDescriptor;
+
+ public FlinkWatermarkHoldState(
+ KeyedStateBackend<ByteBuffer> flinkStateBackend,
+ FlinkStateInternals<K> flinkStateInternals,
+ StateTag<? super K, WatermarkHoldState<W>> address,
+ StateNamespace namespace,
+ OutputTimeFn<? super W> outputTimeFn) {
+ this.address = address;
+ this.outputTimeFn = outputTimeFn;
+ this.namespace = namespace;
+ this.flinkStateBackend = flinkStateBackend;
+ this.flinkStateInternals = flinkStateInternals;
+
+ CoderTypeInformation<Instant> typeInfo = new CoderTypeInformation<>(InstantCoder.of());
+ flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
+ }
+
+ @Override
+ public OutputTimeFn<? super W> getOutputTimeFn() {
+ return outputTimeFn;
+ }
+
+ @Override
+ public WatermarkHoldState<W> readLater() {
+ return this;
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return new ReadableState<Boolean>() {
+ @Override
+ public Boolean read() {
+ try {
+ return flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).value() == null;
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading state.", e);
+ }
+ }
+
+ @Override
+ public ReadableState<Boolean> readLater() {
+ return this;
+ }
+ };
+
+ }
+
+ @Override
+ public void add(Instant value) {
+ try {
+ org.apache.flink.api.common.state.ValueState<Instant> state =
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor);
+
+ Instant current = state.value();
+ if (current == null) {
+ state.update(value);
+ flinkStateInternals.watermarkHolds.put(namespace.stringKey(), value);
+ } else {
+ Instant combined = outputTimeFn.combine(current, value);
+ state.update(combined);
+ flinkStateInternals.watermarkHolds.put(namespace.stringKey(), combined);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Error updating state.", e);
+ }
+ }
+
+ @Override
+ public Instant read() {
+ try {
+ org.apache.flink.api.common.state.ValueState<Instant> state =
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor);
+ return state.value();
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading state.", e);
+ }
+ }
+
+ @Override
+ public void clear() {
+ flinkStateInternals.watermarkHolds.remove(namespace.stringKey());
+ try {
+ org.apache.flink.api.common.state.ValueState<Instant> state =
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor);
+ state.clear();
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading state.", e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ FlinkWatermarkHoldState<?, ?> that = (FlinkWatermarkHoldState<?, ?>) o;
+
+ if (!address.equals(that.address)) {
+ return false;
+ }
+ if (!outputTimeFn.equals(that.outputTimeFn)) {
+ return false;
+ }
+ return namespace.equals(that.namespace);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = address.hashCode();
+ result = 31 * result + outputTimeFn.hashCode();
+ result = 31 * result + namespace.hashCode();
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java
new file mode 100644
index 0000000..b38a520
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
+
+import java.io.DataOutputStream;
+
+/**
+ * This interface is used to checkpoint key-groups state.
+ */
+public interface KeyGroupCheckpointedOperator extends KeyGroupRestoringOperator{
+ /**
+ * Snapshots the state for a given {@code keyGroupIdx}.
+ *
+ * <p>AbstractStreamOperator would call this hook in
+ * AbstractStreamOperator.snapshotState() while iterating over the key groups.
+ * @param keyGroupIndex the id of the key-group to be put in the snapshot.
+ * @param out the stream to write to.
+ */
+ void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java
new file mode 100644
index 0000000..2bdfc6e
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
+
+import java.io.DataInputStream;
+
+/**
+ * This interface is used to restore key-groups state.
+ */
+public interface KeyGroupRestoringOperator {
+ /**
+ * Restore the state for a given {@code keyGroupIndex}.
+ * @param keyGroupIndex the id of the key-group to be put in the snapshot.
+ * @param in the stream to read from.
+ */
+ void restoreKeyGroupState(int keyGroupIndex, DataInputStream in) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java
new file mode 100644
index 0000000..0004e9e
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal state implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/resources/log4j.properties b/runners/flink/src/main/resources/log4j.properties
new file mode 100644
index 0000000..4b6a708
--- /dev/null
+++ b/runners/flink/src/main/resources/log4j.properties
@@ -0,0 +1,23 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF,console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java
new file mode 100644
index 0000000..10d6d9d
--- /dev/null
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.runners.flink.translation.types.EncodedValueComparator;
+import org.apache.beam.runners.flink.translation.types.EncodedValueTypeInformation;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.junit.Assert;
+
+/**
+ * Test for {@link EncodedValueComparator}.
+ */
+public class EncodedValueComparatorTest extends ComparatorTestBase<byte[]> {
+
+ @Override
+ protected TypeComparator<byte[]> createComparator(boolean ascending) {
+ return new EncodedValueTypeInformation().createComparator(ascending, new ExecutionConfig());
+ }
+
+ @Override
+ protected TypeSerializer<byte[]> createSerializer() {
+ return new EncodedValueTypeInformation().createSerializer(new ExecutionConfig());
+ }
+
+ @Override
+ protected void deepEquals(String message, byte[] should, byte[] is) {
+ Assert.assertArrayEquals(message, should, is);
+ }
+
+ @Override
+ protected byte[][] getSortedTestData() {
+ StringUtf8Coder coder = StringUtf8Coder.of();
+
+ try {
+ return new byte[][]{
+ CoderUtils.encodeToByteArray(coder, ""),
+ CoderUtils.encodeToByteArray(coder, "Lorem Ipsum Dolor Omit Longer"),
+ CoderUtils.encodeToByteArray(coder, "aaaa"),
+ CoderUtils.encodeToByteArray(coder, "abcd"),
+ CoderUtils.encodeToByteArray(coder, "abce"),
+ CoderUtils.encodeToByteArray(coder, "abdd"),
+ CoderUtils.encodeToByteArray(coder, "accd"),
+ CoderUtils.encodeToByteArray(coder, "bbcd")
+ };
+ } catch (CoderException e) {
+ throw new RuntimeException("Could not encode values.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
new file mode 100644
index 0000000..d9d174c
--- /dev/null
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+
+/**
+ * Tests the proper registration of the Flink runner.
+ */
+public class FlinkRunnerRegistrarTest {
+
+ @Test
+ public void testFullName() {
+ String[] args =
+ new String[] {String.format("--runner=%s", FlinkRunner.class.getName())};
+ PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
+ assertEquals(opts.getRunner(), FlinkRunner.class);
+ }
+
+ @Test
+ public void testClassName() {
+ String[] args =
+ new String[] {String.format("--runner=%s", FlinkRunner.class.getSimpleName())};
+ PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
+ assertEquals(opts.getRunner(), FlinkRunner.class);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
new file mode 100644
index 0000000..d6240c4
--- /dev/null
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.PipelineRunner;
+
+/**
+ * {@link org.apache.beam.sdk.Pipeline} for testing Dataflow programs on the
+ * {@link FlinkRunner}.
+ */
+public class FlinkTestPipeline extends Pipeline {
+
+ /**
+ * Creates and returns a new test pipeline for batch execution.
+ *
+ * <p>Use {@link org.apache.beam.sdk.testing.PAssert} to add tests, then call
+ * {@link Pipeline#run} to execute the pipeline and check the tests.
+ */
+ public static FlinkTestPipeline createForBatch() {
+ return create(false);
+ }
+
+ /**
+ * Creates and returns a new test pipeline for streaming execution.
+ *
+ * <p>Use {@link org.apache.beam.sdk.testing.PAssert} to add tests, then call
+ * {@link Pipeline#run} to execute the pipeline and check the tests.
+ *
+ * @return The Test Pipeline
+ */
+ public static FlinkTestPipeline createForStreaming() {
+ return create(true);
+ }
+
+ /**
+ * Creates and returns a new test pipeline for streaming or batch execution.
+ *
+ * <p>Use {@link org.apache.beam.sdk.testing.PAssert} to add tests, then call
+ * {@link Pipeline#run} to execute the pipeline and check the tests.
+ *
+ * @param streaming <code>True</code> for streaming mode, <code>False</code> for batch.
+ * @return The Test Pipeline.
+ */
+ private static FlinkTestPipeline create(boolean streaming) {
+ TestFlinkRunner flinkRunner = TestFlinkRunner.create(streaming);
+ return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions());
+ }
+
+ private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner,
+ PipelineOptions options) {
+ super(runner, options);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
new file mode 100644
index 0000000..06187f6
--- /dev/null
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests for serialization and deserialization of {@link PipelineOptions} in {@link DoFnOperator}.
+ */
+public class PipelineOptionsTest {
+
+ /**
+ * Pipeline options.
+ */
+ public interface MyOptions extends FlinkPipelineOptions {
+ @Description("Bla bla bla")
+ @Default.String("Hello")
+ String getTestOption();
+ void setTestOption(String value);
+ }
+
+ private static MyOptions options;
+ private static SerializedPipelineOptions serializedOptions;
+
+ private static final String[] args = new String[]{"--testOption=nothing"};
+
+ @BeforeClass
+ public static void beforeTest() {
+ options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class);
+ serializedOptions = new SerializedPipelineOptions(options);
+ }
+
+ @Test
+ public void testDeserialization() {
+ MyOptions deserializedOptions = serializedOptions.getPipelineOptions().as(MyOptions.class);
+ assertEquals("nothing", deserializedOptions.getTestOption());
+ }
+
+ @Test
+ public void testIgnoredFieldSerialization() {
+ FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setStateBackend(new MemoryStateBackend());
+
+ FlinkPipelineOptions deserialized =
+ new SerializedPipelineOptions(options).getPipelineOptions().as(FlinkPipelineOptions.class);
+
+ assertNull(deserialized.getStateBackend());
+ }
+
+ @Test
+ public void testCaching() {
+ PipelineOptions deserializedOptions =
+ serializedOptions.getPipelineOptions().as(PipelineOptions.class);
+
+ assertNotNull(deserializedOptions);
+ assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
+ assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
+ assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
+ }
+
+ @Test(expected = Exception.class)
+ public void testNonNull() {
+ new SerializedPipelineOptions(null);
+ }
+
+ @Test(expected = Exception.class)
+ public void parDoBaseClassPipelineOptionsNullTest() {
+ DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+ new TestDoFn(),
+ WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
+ new TupleTag<String>("main-output"),
+ Collections.<TupleTag<?>>emptyList(),
+ new DoFnOperator.DefaultOutputManagerFactory<String>(),
+ WindowingStrategy.globalDefault(),
+ new HashMap<Integer, PCollectionView<?>>(),
+ Collections.<PCollectionView<?>>emptyList(),
+ null,
+ null);
+
+ }
+
+ /**
+ * Tests that PipelineOptions are present after serialization.
+ */
+ @Test
+ public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception {
+
+ DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+ new TestDoFn(),
+ WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
+ new TupleTag<String>("main-output"),
+ Collections.<TupleTag<?>>emptyList(),
+ new DoFnOperator.DefaultOutputManagerFactory<String>(),
+ WindowingStrategy.globalDefault(),
+ new HashMap<Integer, PCollectionView<?>>(),
+ Collections.<PCollectionView<?>>emptyList(),
+ options,
+ null);
+
+ final byte[] serialized = SerializationUtils.serialize(doFnOperator);
+
+ @SuppressWarnings("unchecked")
+ DoFnOperator<Object, Object, Object> deserialized =
+ (DoFnOperator<Object, Object, Object>) SerializationUtils.deserialize(serialized);
+
+ TypeInformation<WindowedValue<Object>> typeInformation = TypeInformation.of(
+ new TypeHint<WindowedValue<Object>>() {});
+
+ OneInputStreamOperatorTestHarness<WindowedValue<Object>, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(deserialized,
+ typeInformation.createSerializer(new ExecutionConfig()));
+
+ testHarness.open();
+
+ // execute once to access options
+ testHarness.processElement(new StreamRecord<>(
+ WindowedValue.of(
+ new Object(),
+ Instant.now(),
+ GlobalWindow.INSTANCE,
+ PaneInfo.NO_FIRING)));
+
+ testHarness.close();
+
+ }
+
+
+ private static class TestDoFn extends DoFn<String, String> {
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ Assert.assertNotNull(c.getPipelineOptions());
+ Assert.assertEquals(
+ options.getTestOption(),
+ c.getPipelineOptions().as(MyOptions.class).getTestOption());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
new file mode 100644
index 0000000..44c9017
--- /dev/null
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.base.Joiner;
+import java.io.File;
+import java.net.URI;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+/**
+ * Reads from a bounded source in batch execution.
+ */
+public class ReadSourceITCase extends JavaProgramTestBase {
+
+ protected String resultPath;
+
+ public ReadSourceITCase(){
+ }
+
+ private static final String[] EXPECTED_RESULT = new String[] {
+ "0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+
+ // need to create the dir, otherwise Beam sinks don't
+ // work for these tests
+
+ if (!new File(new URI(resultPath)).mkdirs()) {
+ throw new RuntimeException("Could not create output dir.");
+ }
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ runProgram(resultPath);
+ }
+
+ private static void runProgram(String resultPath) throws Exception {
+
+ Pipeline p = FlinkTestPipeline.createForBatch();
+
+ PCollection<String> result = p
+ .apply(CountingInput.upTo(10))
+ .apply(ParDo.of(new DoFn<Long, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(c.element().toString());
+ }
+ }));
+
+ result.apply(TextIO.Write.to(new URI(resultPath).getPath() + "/part"));
+
+ p.run();
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
new file mode 100644
index 0000000..79b7882
--- /dev/null
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.base.Joiner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+/**
+ * Reads from a bounded source in streaming.
+ */
+public class ReadSourceStreamingITCase extends StreamingProgramTestBase {
+
+ protected String resultPath;
+
+ public ReadSourceStreamingITCase(){
+ }
+
+ private static final String[] EXPECTED_RESULT = new String[] {
+ "0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ runProgram(resultPath);
+ }
+
+ private static void runProgram(String resultPath) {
+
+ Pipeline p = FlinkTestPipeline.createForStreaming();
+
+ p
+ .apply(CountingInput.upTo(10))
+ .apply(ParDo.of(new DoFn<Long, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(c.element().toString());
+ }
+ }))
+ .apply(TextIO.Write.to(resultPath));
+
+ p.run();
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
new file mode 100644
index 0000000..38b790e
--- /dev/null
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink;
+
+import static org.junit.Assert.assertNotNull;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.Sink;
+import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+/**
+ * Tests the translation of custom Write sinks.
+ */
+public class WriteSinkITCase extends JavaProgramTestBase {
+
+ protected String resultPath;
+
+ public WriteSinkITCase(){
+ }
+
+ static final String[] EXPECTED_RESULT = new String[] {
+ "Joe red 3", "Mary blue 4", "Max yellow 23"};
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result-" + System.nanoTime());
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ runProgram(resultPath);
+ }
+
+ @Override
+ public void stopCluster() throws Exception {
+ try {
+ super.stopCluster();
+ } catch (final IOException ioe) {
+ if (ioe.getMessage().startsWith("Unable to delete file")) {
+ // that's ok for the test itself, just the OS playing with us on cleanup phase
+ }
+ }
+ }
+
+ private static void runProgram(String resultPath) {
+ Pipeline p = FlinkTestPipeline.createForBatch();
+
+ p.apply(Create.of(ImmutableList.copyOf(EXPECTED_RESULT))).setCoder(StringUtf8Coder.of())
+ .apply("CustomSink", Write.to(new MyCustomSink(resultPath)));
+
+ p.run();
+ }
+
+ /**
+ * Simple custom sink which writes to a file.
+ */
+ private static class MyCustomSink extends Sink<String> {
+
+ private final String resultPath;
+
+ public MyCustomSink(String resultPath) {
+ this.resultPath = resultPath;
+ }
+
+ @Override
+ public void validate(PipelineOptions options) {
+ assertNotNull(options);
+ }
+
+ @Override
+ public WriteOperation<String, ?> createWriteOperation(PipelineOptions options) {
+ return new MyWriteOperation();
+ }
+
+ private class MyWriteOperation extends WriteOperation<String, String> {
+
+ @Override
+ public Coder<String> getWriterResultCoder() {
+ return StringUtf8Coder.of();
+ }
+
+ @Override
+ public void initialize(PipelineOptions options) throws Exception {
+
+ }
+
+ @Override
+ public void setWindowedWrites(boolean windowedWrites) {
+
+ }
+
+ @Override
+ public void finalize(Iterable<String> writerResults, PipelineOptions options)
+ throws Exception {
+
+ }
+
+ @Override
+ public Writer<String, String> createWriter(PipelineOptions options) throws Exception {
+ return new MyWriter();
+ }
+
+ @Override
+ public Sink<String> getSink() {
+ return MyCustomSink.this;
+ }
+
+ /**
+ * Simple Writer which writes to a file.
+ */
+ private class MyWriter extends Writer<String, String> {
+
+ private PrintWriter internalWriter;
+
+ @Override
+ public final void openWindowed(String uId,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ int shard,
+ int numShards) throws Exception {
+ throw new UnsupportedOperationException("Windowed writes not supported.");
+ }
+
+ @Override
+ public final void openUnwindowed(String uId, int shard, int numShards) throws Exception {
+ Path path = new Path(resultPath + "/" + uId);
+ FileSystem.get(new URI("file:///")).create(path, false);
+ internalWriter = new PrintWriter(new File(path.toUri()));
+ }
+
+ @Override
+ public void cleanup() throws Exception {
+
+ }
+
+ @Override
+ public void write(String value) throws Exception {
+ internalWriter.println(value);
+ }
+
+ @Override
+ public String close() throws Exception {
+ internalWriter.close();
+ return resultPath;
+ }
+
+ @Override
+ public WriteOperation<String, String> getWriteOperation() {
+ return MyWriteOperation.this;
+ }
+ }
+ }
+ }
+
+}
+