You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/03/15 17:06:57 UTC
[02/13] incubator-beam git commit: [flink] restructure and cleanup
Maven layout
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
deleted file mode 100644
index 6cf46e5..0000000
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ /dev/null
@@ -1,715 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
-import com.google.cloud.dataflow.sdk.util.state.*;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
-import org.apache.flink.util.InstantiationUtil;
-import org.joda.time.Instant;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.*;
-
-/**
- * An implementation of the Beam {@link StateInternals}. This implementation simply keeps elements in memory.
- * This state is periodically checkpointed by Flink, for fault-tolerance.
- *
- * TODO: State should be rewritten to redirect to Flink per-key state so that coders and combiners don't need
- * to be serialized along with encoded values when snapshotting.
- */
-public class FlinkStateInternals<K> implements StateInternals<K> {
-
- private final K key;
-
- private final Coder<K> keyCoder;
-
- private final Coder<? extends BoundedWindow> windowCoder;
-
- private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
-
- private Instant watermarkHoldAccessor;
-
- public FlinkStateInternals(K key,
- Coder<K> keyCoder,
- Coder<? extends BoundedWindow> windowCoder,
- OutputTimeFn<? super BoundedWindow> outputTimeFn) {
- this.key = key;
- this.keyCoder = keyCoder;
- this.windowCoder = windowCoder;
- this.outputTimeFn = outputTimeFn;
- }
-
- public Instant getWatermarkHold() {
- return watermarkHoldAccessor;
- }
-
- /**
- * This is the interface state has to implement in order for it to be fault tolerant when
- * executed by the FlinkPipelineRunner.
- */
- private interface CheckpointableIF {
-
- boolean shouldPersist();
-
- void persistState(StateCheckpointWriter checkpointBuilder) throws IOException;
- }
-
- protected final StateTable<K> inMemoryState = new StateTable<K>() {
- @Override
- protected StateTag.StateBinder binderForNamespace(final StateNamespace namespace, final StateContext<?> c) {
- return new StateTag.StateBinder<K>() {
-
- @Override
- public <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
- return new FlinkInMemoryValue<>(encodeKey(namespace, address), coder);
- }
-
- @Override
- public <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
- return new FlinkInMemoryBag<>(encodeKey(namespace, address), elemCoder);
- }
-
- @Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
- return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c);
- }
-
- @Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c);
- }
-
- @Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
- return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c);
- }
-
- @Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(StateTag<? super K, WatermarkHoldState<W>> address, OutputTimeFn<? super W> outputTimeFn) {
- return new FlinkWatermarkHoldStateImpl<>(encodeKey(namespace, address), outputTimeFn);
- }
- };
- }
- };
-
- @Override
- public K getKey() {
- return key;
- }
-
- @Override
- public <StateT extends State> StateT state(StateNamespace namespace, StateTag<? super K, StateT> address) {
- return inMemoryState.get(namespace, address, null);
- }
-
- @Override
- public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c) {
- return inMemoryState.get(namespace, address, c);
- }
-
- public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
- checkpointBuilder.writeInt(getNoOfElements());
-
- for (State location : inMemoryState.values()) {
- if (!(location instanceof CheckpointableIF)) {
- throw new IllegalStateException(String.format(
- "%s wasn't created by %s -- unable to persist it",
- location.getClass().getSimpleName(),
- getClass().getSimpleName()));
- }
- ((CheckpointableIF) location).persistState(checkpointBuilder);
- }
- }
-
- public void restoreState(StateCheckpointReader checkpointReader, ClassLoader loader)
- throws IOException, ClassNotFoundException {
-
- // the number of elements to read.
- int noOfElements = checkpointReader.getInt();
- for (int i = 0; i < noOfElements; i++) {
- decodeState(checkpointReader, loader);
- }
- }
-
- /**
- * We remove the first character which encodes the type of the stateTag ('s' for system
- * and 'u' for user). For more details check out the source of
- * {@link StateTags.StateTagBase#getId()}.
- */
- private void decodeState(StateCheckpointReader reader, ClassLoader loader)
- throws IOException, ClassNotFoundException {
-
- StateType stateItemType = StateType.deserialize(reader);
- ByteString stateKey = reader.getTag();
-
- // first decode the namespace and the tagId...
- String[] namespaceAndTag = stateKey.toStringUtf8().split("\\+");
- if (namespaceAndTag.length != 2) {
- throw new IllegalArgumentException("Invalid stateKey " + stateKey.toString() + ".");
- }
- StateNamespace namespace = StateNamespaces.fromString(namespaceAndTag[0], windowCoder);
-
- // ... decide if it is a system or user stateTag...
- char ownerTag = namespaceAndTag[1].charAt(0);
- if (ownerTag != 's' && ownerTag != 'u') {
- throw new RuntimeException("Invalid StateTag name.");
- }
- boolean isSystemTag = ownerTag == 's';
- String tagId = namespaceAndTag[1].substring(1);
-
- // ...then decode the coder (if there is one)...
- Coder<?> coder = null;
- switch (stateItemType) {
- case VALUE:
- case LIST:
- case ACCUMULATOR:
- ByteString coderBytes = reader.getData();
- coder = InstantiationUtil.deserializeObject(coderBytes.toByteArray(), loader);
- break;
- case WATERMARK:
- break;
- }
-
- // ...then decode the combiner function (if there is one)...
- CombineWithContext.KeyedCombineFnWithContext<? super K, ?, ?, ?> combineFn = null;
- switch (stateItemType) {
- case ACCUMULATOR:
- ByteString combinerBytes = reader.getData();
- combineFn = InstantiationUtil.deserializeObject(combinerBytes.toByteArray(), loader);
- break;
- case VALUE:
- case LIST:
- case WATERMARK:
- break;
- }
-
- //... and finally, depending on the type of the state being decoded,
- // 1) create the adequate stateTag,
- // 2) create the state container,
- // 3) restore the actual content.
- switch (stateItemType) {
- case VALUE: {
- StateTag stateTag = StateTags.value(tagId, coder);
- stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
- @SuppressWarnings("unchecked")
- FlinkInMemoryValue<?> value = (FlinkInMemoryValue<?>) inMemoryState.get(namespace, stateTag, null);
- value.restoreState(reader);
- break;
- }
- case WATERMARK: {
- @SuppressWarnings("unchecked")
- StateTag<Object, WatermarkHoldState<BoundedWindow>> stateTag = StateTags.watermarkStateInternal(tagId, outputTimeFn);
- stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
- @SuppressWarnings("unchecked")
- FlinkWatermarkHoldStateImpl<?> watermark = (FlinkWatermarkHoldStateImpl<?>) inMemoryState.get(namespace, stateTag, null);
- watermark.restoreState(reader);
- break;
- }
- case LIST: {
- StateTag stateTag = StateTags.bag(tagId, coder);
- stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
- FlinkInMemoryBag<?> bag = (FlinkInMemoryBag<?>) inMemoryState.get(namespace, stateTag, null);
- bag.restoreState(reader);
- break;
- }
- case ACCUMULATOR: {
- @SuppressWarnings("unchecked")
- StateTag<K, AccumulatorCombiningState<?, ?, ?>> stateTag = StateTags.keyedCombiningValueWithContext(tagId, (Coder) coder, combineFn);
- stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
- @SuppressWarnings("unchecked")
- FlinkInMemoryKeyedCombiningValue<?, ?, ?> combiningValue =
- (FlinkInMemoryKeyedCombiningValue<?, ?, ?>) inMemoryState.get(namespace, stateTag, null);
- combiningValue.restoreState(reader);
- break;
- }
- default:
- throw new RuntimeException("Unknown State Type " + stateItemType + ".");
- }
- }
-
- private ByteString encodeKey(StateNamespace namespace, StateTag<? super K, ?> address) {
- StringBuilder sb = new StringBuilder();
- try {
- namespace.appendTo(sb);
- sb.append('+');
- address.appendTo(sb);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return ByteString.copyFromUtf8(sb.toString());
- }
-
- private int getNoOfElements() {
- int noOfElements = 0;
- for (State state : inMemoryState.values()) {
- if (!(state instanceof CheckpointableIF)) {
- throw new RuntimeException("State Implementations used by the " +
- "Flink Dataflow Runner should implement the CheckpointableIF interface.");
- }
-
- if (((CheckpointableIF) state).shouldPersist()) {
- noOfElements++;
- }
- }
- return noOfElements;
- }
-
- private final class FlinkInMemoryValue<T> implements ValueState<T>, CheckpointableIF {
-
- private final ByteString stateKey;
- private final Coder<T> elemCoder;
-
- private T value = null;
-
- public FlinkInMemoryValue(ByteString stateKey, Coder<T> elemCoder) {
- this.stateKey = stateKey;
- this.elemCoder = elemCoder;
- }
-
- @Override
- public void clear() {
- value = null;
- }
-
- @Override
- public void write(T input) {
- this.value = input;
- }
-
- @Override
- public T read() {
- return value;
- }
-
- @Override
- public ValueState<T> readLater() {
- // Ignore
- return this;
- }
-
- @Override
- public boolean shouldPersist() {
- return value != null;
- }
-
- @Override
- public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
- if (value != null) {
- // serialize the coder.
- byte[] coder = InstantiationUtil.serializeObject(elemCoder);
-
- // encode the value into a ByteString
- ByteString.Output stream = ByteString.newOutput();
- elemCoder.encode(value, stream, Coder.Context.OUTER);
- ByteString data = stream.toByteString();
-
- checkpointBuilder.addValueBuilder()
- .setTag(stateKey)
- .setData(coder)
- .setData(data);
- }
- }
-
- public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
- ByteString valueContent = checkpointReader.getData();
- T outValue = elemCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
- write(outValue);
- }
- }
-
- private final class FlinkWatermarkHoldStateImpl<W extends BoundedWindow>
- implements WatermarkHoldState<W>, CheckpointableIF {
-
- private final ByteString stateKey;
-
- private Instant minimumHold = null;
-
- private OutputTimeFn<? super W> outputTimeFn;
-
- public FlinkWatermarkHoldStateImpl(ByteString stateKey, OutputTimeFn<? super W> outputTimeFn) {
- this.stateKey = stateKey;
- this.outputTimeFn = outputTimeFn;
- }
-
- @Override
- public void clear() {
- // Even though we're clearing we can't remove this from the in-memory state map, since
- // other users may already have a handle on this WatermarkBagInternal.
- minimumHold = null;
- watermarkHoldAccessor = null;
- }
-
- @Override
- public void add(Instant watermarkHold) {
- if (minimumHold == null || minimumHold.isAfter(watermarkHold)) {
- watermarkHoldAccessor = watermarkHold;
- minimumHold = watermarkHold;
- }
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public Boolean read() {
- return minimumHold == null;
- }
-
- @Override
- public ReadableState<Boolean> readLater() {
- // Ignore
- return this;
- }
- };
- }
-
- @Override
- public OutputTimeFn<? super W> getOutputTimeFn() {
- return outputTimeFn;
- }
-
- @Override
- public Instant read() {
- return minimumHold;
- }
-
- @Override
- public WatermarkHoldState<W> readLater() {
- // Ignore
- return this;
- }
-
- @Override
- public String toString() {
- return Objects.toString(minimumHold);
- }
-
- @Override
- public boolean shouldPersist() {
- return minimumHold != null;
- }
-
- @Override
- public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
- if (minimumHold != null) {
- checkpointBuilder.addWatermarkHoldsBuilder()
- .setTag(stateKey)
- .setTimestamp(minimumHold);
- }
- }
-
- public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
- Instant watermark = checkpointReader.getTimestamp();
- add(watermark);
- }
- }
-
-
- private static <K, InputT, AccumT, OutputT> CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> withContext(
- final Combine.KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
- return new CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
- @Override
- public AccumT createAccumulator(K key, CombineWithContext.Context c) {
- return combineFn.createAccumulator(key);
- }
-
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT value, CombineWithContext.Context c) {
- return combineFn.addInput(key, accumulator, value);
- }
-
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, CombineWithContext.Context c) {
- return combineFn.mergeAccumulators(key, accumulators);
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, CombineWithContext.Context c) {
- return combineFn.extractOutput(key, accumulator);
- }
- };
- }
-
- private static <K, InputT, AccumT, OutputT> CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> withKeyAndContext(
- final Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
- return new CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
- @Override
- public AccumT createAccumulator(K key, CombineWithContext.Context c) {
- return combineFn.createAccumulator();
- }
-
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT value, CombineWithContext.Context c) {
- return combineFn.addInput(accumulator, value);
- }
-
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, CombineWithContext.Context c) {
- return combineFn.mergeAccumulators(accumulators);
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, CombineWithContext.Context c) {
- return combineFn.extractOutput(accumulator);
- }
- };
- }
-
- private final class FlinkInMemoryKeyedCombiningValue<InputT, AccumT, OutputT>
- implements AccumulatorCombiningState<InputT, AccumT, OutputT>, CheckpointableIF {
-
- private final ByteString stateKey;
- private final CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn;
- private final Coder<AccumT> accumCoder;
- private final CombineWithContext.Context context;
-
- private AccumT accum = null;
- private boolean isClear = true;
-
- private FlinkInMemoryKeyedCombiningValue(ByteString stateKey,
- Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
- Coder<AccumT> accumCoder,
- final StateContext<?> stateContext) {
- this(stateKey, withKeyAndContext(combineFn), accumCoder, stateContext);
- }
-
-
- private FlinkInMemoryKeyedCombiningValue(ByteString stateKey,
- Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn,
- Coder<AccumT> accumCoder,
- final StateContext<?> stateContext) {
- this(stateKey, withContext(combineFn), accumCoder, stateContext);
- }
-
- private FlinkInMemoryKeyedCombiningValue(ByteString stateKey,
- CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn,
- Coder<AccumT> accumCoder,
- final StateContext<?> stateContext) {
- Preconditions.checkNotNull(combineFn);
- Preconditions.checkNotNull(accumCoder);
-
- this.stateKey = stateKey;
- this.combineFn = combineFn;
- this.accumCoder = accumCoder;
- this.context = new CombineWithContext.Context() {
- @Override
- public PipelineOptions getPipelineOptions() {
- return stateContext.getPipelineOptions();
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- return stateContext.sideInput(view);
- }
- };
- accum = combineFn.createAccumulator(key, context);
- }
-
- @Override
- public void clear() {
- accum = combineFn.createAccumulator(key, context);
- isClear = true;
- }
-
- @Override
- public void add(InputT input) {
- isClear = false;
- accum = combineFn.addInput(key, accum, input, context);
- }
-
- @Override
- public AccumT getAccum() {
- return accum;
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public ReadableState<Boolean> readLater() {
- // Ignore
- return this;
- }
-
- @Override
- public Boolean read() {
- return isClear;
- }
- };
- }
-
- @Override
- public void addAccum(AccumT accum) {
- isClear = false;
- this.accum = combineFn.mergeAccumulators(key, Arrays.asList(this.accum, accum), context);
- }
-
- @Override
- public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(key, accumulators, context);
- }
-
- @Override
- public OutputT read() {
- return combineFn.extractOutput(key, accum, context);
- }
-
- @Override
- public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
- // Ignore
- return this;
- }
-
- @Override
- public boolean shouldPersist() {
- return !isClear;
- }
-
- @Override
- public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
- if (!isClear) {
- // serialize the coder.
- byte[] coder = InstantiationUtil.serializeObject(accumCoder);
-
- // serialize the combiner.
- byte[] combiner = InstantiationUtil.serializeObject(combineFn);
-
- // encode the accumulator into a ByteString
- ByteString.Output stream = ByteString.newOutput();
- accumCoder.encode(accum, stream, Coder.Context.OUTER);
- ByteString data = stream.toByteString();
-
- // put the flag that the next serialized element is an accumulator
- checkpointBuilder.addAccumulatorBuilder()
- .setTag(stateKey)
- .setData(coder)
- .setData(combiner)
- .setData(data);
- }
- }
-
- public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
- ByteString valueContent = checkpointReader.getData();
- AccumT accum = this.accumCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
- addAccum(accum);
- }
- }
-
- private static final class FlinkInMemoryBag<T> implements BagState<T>, CheckpointableIF {
- private final List<T> contents = new ArrayList<>();
-
- private final ByteString stateKey;
- private final Coder<T> elemCoder;
-
- public FlinkInMemoryBag(ByteString stateKey, Coder<T> elemCoder) {
- this.stateKey = stateKey;
- this.elemCoder = elemCoder;
- }
-
- @Override
- public void clear() {
- contents.clear();
- }
-
- @Override
- public Iterable<T> read() {
- return contents;
- }
-
- @Override
- public BagState<T> readLater() {
- // Ignore
- return this;
- }
-
- @Override
- public void add(T input) {
- contents.add(input);
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public ReadableState<Boolean> readLater() {
- // Ignore
- return this;
- }
-
- @Override
- public Boolean read() {
- return contents.isEmpty();
- }
- };
- }
-
- @Override
- public boolean shouldPersist() {
- return !contents.isEmpty();
- }
-
- @Override
- public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
- if (!contents.isEmpty()) {
- // serialize the coder.
- byte[] coder = InstantiationUtil.serializeObject(elemCoder);
-
- checkpointBuilder.addListUpdatesBuilder()
- .setTag(stateKey)
- .setData(coder)
- .writeInt(contents.size());
-
- for (T item : contents) {
- // encode the element
- ByteString.Output stream = ByteString.newOutput();
- elemCoder.encode(item, stream, Coder.Context.OUTER);
- ByteString data = stream.toByteString();
-
- // add the data to the checkpoint.
- checkpointBuilder.setData(data);
- }
- }
- }
-
- public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
- int noOfValues = checkpointReader.getInt();
- for (int j = 0; j < noOfValues; j++) {
- ByteString valueContent = checkpointReader.getData();
- T outValue = elemCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
- add(outValue);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointReader.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointReader.java
deleted file mode 100644
index 5aadccd..0000000
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointReader.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
-import com.google.protobuf.ByteString;
-import org.apache.flink.core.memory.DataInputView;
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-public class StateCheckpointReader {
-
- private final DataInputView input;
-
- public StateCheckpointReader(DataInputView in) {
- this.input = in;
- }
-
- public ByteString getTag() throws IOException {
- return ByteString.copyFrom(readRawData());
- }
-
- public String getTagToString() throws IOException {
- return input.readUTF();
- }
-
- public ByteString getData() throws IOException {
- return ByteString.copyFrom(readRawData());
- }
-
- public int getInt() throws IOException {
- validate();
- return input.readInt();
- }
-
- public byte getByte() throws IOException {
- validate();
- return input.readByte();
- }
-
- public Instant getTimestamp() throws IOException {
- validate();
- Long watermarkMillis = input.readLong();
- return new Instant(TimeUnit.MICROSECONDS.toMillis(watermarkMillis));
- }
-
- public <K> K deserializeKey(CoderTypeSerializer<K> keySerializer) throws IOException {
- return deserializeObject(keySerializer);
- }
-
- public <T> T deserializeObject(CoderTypeSerializer<T> objectSerializer) throws IOException {
- return objectSerializer.deserialize(input);
- }
-
- ///////// Helper Methods ///////
-
- private byte[] readRawData() throws IOException {
- validate();
- int size = input.readInt();
-
- byte[] serData = new byte[size];
- int bytesRead = input.read(serData);
- if (bytesRead != size) {
- throw new RuntimeException("Error while deserializing checkpoint. Not enough bytes in the input stream.");
- }
- return serData;
- }
-
- private void validate() {
- if (this.input == null) {
- throw new RuntimeException("StateBackend not initialized yet.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointUtils.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointUtils.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointUtils.java
deleted file mode 100644
index b2dc33c..0000000
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointUtils.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
-import com.google.cloud.dataflow.sdk.util.TimeDomain;
-import com.google.cloud.dataflow.sdk.util.TimerInternals;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespace;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-public class StateCheckpointUtils {
-
- public static <K> void encodeState(Map<K, FlinkStateInternals<K>> perKeyStateInternals,
- StateCheckpointWriter writer, Coder<K> keyCoder) throws IOException {
- CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
-
- int noOfKeys = perKeyStateInternals.size();
- writer.writeInt(noOfKeys);
- for (Map.Entry<K, FlinkStateInternals<K>> keyStatePair : perKeyStateInternals.entrySet()) {
- K key = keyStatePair.getKey();
- FlinkStateInternals<K> state = keyStatePair.getValue();
-
- // encode the key
- writer.serializeKey(key, keySerializer);
-
- // write the associated state
- state.persistState(writer);
- }
- }
-
- public static <K> Map<K, FlinkStateInternals<K>> decodeState(
- StateCheckpointReader reader,
- OutputTimeFn<? super BoundedWindow> outputTimeFn,
- Coder<K> keyCoder,
- Coder<? extends BoundedWindow> windowCoder,
- ClassLoader classLoader) throws IOException, ClassNotFoundException {
-
- int noOfKeys = reader.getInt();
- Map<K, FlinkStateInternals<K>> perKeyStateInternals = new HashMap<>(noOfKeys);
- perKeyStateInternals.clear();
-
- CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
- for (int i = 0; i < noOfKeys; i++) {
-
- // decode the key.
- K key = reader.deserializeKey(keySerializer);
-
- //decode the state associated to the key.
- FlinkStateInternals<K> stateForKey =
- new FlinkStateInternals<>(key, keyCoder, windowCoder, outputTimeFn);
- stateForKey.restoreState(reader, classLoader);
- perKeyStateInternals.put(key, stateForKey);
- }
- return perKeyStateInternals;
- }
-
- ////////////// Encoding/Decoding the Timers ////////////////
-
-
- public static <K> void encodeTimers(Map<K, Set<TimerInternals.TimerData>> allTimers,
- StateCheckpointWriter writer,
- Coder<K> keyCoder) throws IOException {
- CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
-
- int noOfKeys = allTimers.size();
- writer.writeInt(noOfKeys);
- for (Map.Entry<K, Set<TimerInternals.TimerData>> timersPerKey : allTimers.entrySet()) {
- K key = timersPerKey.getKey();
-
- // encode the key
- writer.serializeKey(key, keySerializer);
-
- // write the associated timers
- Set<TimerInternals.TimerData> timers = timersPerKey.getValue();
- encodeTimerDataForKey(writer, timers);
- }
- }
-
- public static <K> Map<K, Set<TimerInternals.TimerData>> decodeTimers(
- StateCheckpointReader reader,
- Coder<? extends BoundedWindow> windowCoder,
- Coder<K> keyCoder) throws IOException {
-
- int noOfKeys = reader.getInt();
- Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(noOfKeys);
- activeTimers.clear();
-
- CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
- for (int i = 0; i < noOfKeys; i++) {
-
- // decode the key.
- K key = reader.deserializeKey(keySerializer);
-
- // decode the associated timers.
- Set<TimerInternals.TimerData> timers = decodeTimerDataForKey(reader, windowCoder);
- activeTimers.put(key, timers);
- }
- return activeTimers;
- }
-
- private static void encodeTimerDataForKey(StateCheckpointWriter writer, Set<TimerInternals.TimerData> timers) throws IOException {
- // encode timers
- writer.writeInt(timers.size());
- for (TimerInternals.TimerData timer : timers) {
- String stringKey = timer.getNamespace().stringKey();
-
- writer.setTag(stringKey);
- writer.setTimestamp(timer.getTimestamp());
- writer.writeInt(timer.getDomain().ordinal());
- }
- }
-
- private static Set<TimerInternals.TimerData> decodeTimerDataForKey(
- StateCheckpointReader reader, Coder<? extends BoundedWindow> windowCoder) throws IOException {
-
- // decode the timers: first their number and then the content itself.
- int noOfTimers = reader.getInt();
- Set<TimerInternals.TimerData> timers = new HashSet<>(noOfTimers);
- for (int i = 0; i < noOfTimers; i++) {
- String stringKey = reader.getTagToString();
- Instant instant = reader.getTimestamp();
- TimeDomain domain = TimeDomain.values()[reader.getInt()];
-
- StateNamespace namespace = StateNamespaces.fromString(stringKey, windowCoder);
- timers.add(TimerInternals.TimerData.of(namespace, instant, domain));
- }
- return timers;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointWriter.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointWriter.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointWriter.java
deleted file mode 100644
index 18e118a..0000000
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointWriter.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
-import com.google.protobuf.ByteString;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-public class StateCheckpointWriter {
-
- private final AbstractStateBackend.CheckpointStateOutputView output;
-
- public static StateCheckpointWriter create(AbstractStateBackend.CheckpointStateOutputView output) {
- return new StateCheckpointWriter(output);
- }
-
- private StateCheckpointWriter(AbstractStateBackend.CheckpointStateOutputView output) {
- this.output = output;
- }
-
- ///////// Creating the serialized versions of the different types of state held by dataflow ///////
-
- public StateCheckpointWriter addValueBuilder() throws IOException {
- validate();
- StateType.serialize(StateType.VALUE, this);
- return this;
- }
-
- public StateCheckpointWriter addWatermarkHoldsBuilder() throws IOException {
- validate();
- StateType.serialize(StateType.WATERMARK, this);
- return this;
- }
-
- public StateCheckpointWriter addListUpdatesBuilder() throws IOException {
- validate();
- StateType.serialize(StateType.LIST, this);
- return this;
- }
-
- public StateCheckpointWriter addAccumulatorBuilder() throws IOException {
- validate();
- StateType.serialize(StateType.ACCUMULATOR, this);
- return this;
- }
-
- ///////// Setting the tag for a given state element ///////
-
- public StateCheckpointWriter setTag(ByteString stateKey) throws IOException {
- return writeData(stateKey.toByteArray());
- }
-
- public StateCheckpointWriter setTag(String stateKey) throws IOException {
- output.writeUTF(stateKey);
- return this;
- }
-
-
- public <K> StateCheckpointWriter serializeKey(K key, CoderTypeSerializer<K> keySerializer) throws IOException {
- return serializeObject(key, keySerializer);
- }
-
- public <T> StateCheckpointWriter serializeObject(T object, CoderTypeSerializer<T> objectSerializer) throws IOException {
- objectSerializer.serialize(object, output);
- return this;
- }
-
- ///////// Write the actual serialized data //////////
-
- public StateCheckpointWriter setData(ByteString data) throws IOException {
- return writeData(data.toByteArray());
- }
-
- public StateCheckpointWriter setData(byte[] data) throws IOException {
- return writeData(data);
- }
-
- public StateCheckpointWriter setTimestamp(Instant timestamp) throws IOException {
- validate();
- output.writeLong(TimeUnit.MILLISECONDS.toMicros(timestamp.getMillis()));
- return this;
- }
-
- public StateCheckpointWriter writeInt(int number) throws IOException {
- validate();
- output.writeInt(number);
- return this;
- }
-
- public StateCheckpointWriter writeByte(byte b) throws IOException {
- validate();
- output.writeByte(b);
- return this;
- }
-
- ///////// Helper Methods ///////
-
- private StateCheckpointWriter writeData(byte[] data) throws IOException {
- validate();
- output.writeInt(data.length);
- output.write(data);
- return this;
- }
-
- private void validate() {
- if (this.output == null) {
- throw new RuntimeException("StateBackend not initialized yet.");
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateType.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateType.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateType.java
deleted file mode 100644
index 5849773..0000000
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateType.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import java.io.IOException;
-
-/**
- * The available types of state, as provided by the Beam SDK. This class is used for serialization/deserialization
- * purposes.
- * */
-public enum StateType {
-
- VALUE(0),
-
- WATERMARK(1),
-
- LIST(2),
-
- ACCUMULATOR(3);
-
- private final int numVal;
-
- StateType(int value) {
- this.numVal = value;
- }
-
- public static void serialize(StateType type, StateCheckpointWriter output) throws IOException {
- if (output == null) {
- throw new IllegalArgumentException("Cannot write to a null output.");
- }
-
- if(type.numVal < 0 || type.numVal > 3) {
- throw new RuntimeException("Unknown State Type " + type + ".");
- }
-
- output.writeByte((byte) type.numVal);
- }
-
- public static StateType deserialize(StateCheckpointReader input) throws IOException {
- if (input == null) {
- throw new IllegalArgumentException("Cannot read from a null input.");
- }
-
- int typeInt = (int) input.getByte();
- if(typeInt < 0 || typeInt > 3) {
- throw new RuntimeException("Unknown State Type " + typeInt + ".");
- }
-
- StateType resultType = null;
- for(StateType st: values()) {
- if(st.numVal == typeInt) {
- resultType = st;
- break;
- }
- }
- return resultType;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/resources/log4j.properties b/runners/flink/src/main/resources/log4j.properties
deleted file mode 100644
index 4daaad1..0000000
--- a/runners/flink/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO,console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/AvroITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/AvroITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/AvroITCase.java
deleted file mode 100644
index 3536f87..0000000
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/AvroITCase.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.flink;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.AvroCoder;
-import com.google.cloud.dataflow.sdk.io.AvroIO;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.common.base.Joiner;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-
-public class AvroITCase extends JavaProgramTestBase {
-
- protected String resultPath;
- protected String tmpPath;
-
- public AvroITCase(){
- }
-
- static final String[] EXPECTED_RESULT = new String[] {
- "Joe red 3",
- "Mary blue 4",
- "Mark green 1",
- "Julia purple 5"
- };
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- tmpPath = getTempDirPath("tmp");
-
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- runProgram(tmpPath, resultPath);
- }
-
- private static void runProgram(String tmpPath, String resultPath) {
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- p
- .apply(Create.of(
- new User("Joe", 3, "red"),
- new User("Mary", 4, "blue"),
- new User("Mark", 1, "green"),
- new User("Julia", 5, "purple"))
- .withCoder(AvroCoder.of(User.class)))
-
- .apply(AvroIO.Write.to(tmpPath)
- .withSchema(User.class));
-
- p.run();
-
- p = FlinkTestPipeline.createForBatch();
-
- p
- .apply(AvroIO.Read.from(tmpPath).withSchema(User.class).withoutValidation())
-
- .apply(ParDo.of(new DoFn<User, String>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- User u = c.element();
- String result = u.getName() + " " + u.getFavoriteColor() + " " + u.getFavoriteNumber();
- c.output(result);
- }
- }))
-
- .apply(TextIO.Write.to(resultPath));
-
- p.run();
- }
-
- private static class User {
-
- private String name;
- private int favoriteNumber;
- private String favoriteColor;
-
- public User() {}
-
- public User(String name, int favoriteNumber, String favoriteColor) {
- this.name = name;
- this.favoriteNumber = favoriteNumber;
- this.favoriteColor = favoriteColor;
- }
-
- public String getName() {
- return name;
- }
-
- public String getFavoriteColor() {
- return favoriteColor;
- }
-
- public int getFavoriteNumber() {
- return favoriteNumber;
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/FlattenizeITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlattenizeITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlattenizeITCase.java
deleted file mode 100644
index 5ae0e83..0000000
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlattenizeITCase.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-import com.google.common.base.Joiner;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-public class FlattenizeITCase extends JavaProgramTestBase {
-
- private String resultPath;
- private String resultPath2;
-
- private static final String[] words = {"hello", "this", "is", "a", "DataSet!"};
- private static final String[] words2 = {"hello", "this", "is", "another", "DataSet!"};
- private static final String[] words3 = {"hello", "this", "is", "yet", "another", "DataSet!"};
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- resultPath2 = getTempDirPath("result2");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- String join = Joiner.on('\n').join(words);
- String join2 = Joiner.on('\n').join(words2);
- String join3 = Joiner.on('\n').join(words3);
- compareResultsByLinesInMemory(join + "\n" + join2, resultPath);
- compareResultsByLinesInMemory(join + "\n" + join2 + "\n" + join3, resultPath2);
- }
-
-
- @Override
- protected void testProgram() throws Exception {
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- PCollection<String> p1 = p.apply(Create.of(words));
- PCollection<String> p2 = p.apply(Create.of(words2));
-
- PCollectionList<String> list = PCollectionList.of(p1).and(p2);
-
- list.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath));
-
- PCollection<String> p3 = p.apply(Create.of(words3));
-
- PCollectionList<String> list2 = list.and(p3);
-
- list2.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath2));
-
- p.run();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
deleted file mode 100644
index aadda24..0000000
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-
-/**
- * {@link com.google.cloud.dataflow.sdk.Pipeline} for testing Dataflow programs on the
- * {@link org.apache.beam.runners.flink.FlinkPipelineRunner}.
- */
-public class FlinkTestPipeline extends Pipeline {
-
- /**
- * Creates and returns a new test pipeline for batch execution.
- *
- * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
- * {@link Pipeline#run} to execute the pipeline and check the tests.
- */
- public static FlinkTestPipeline createForBatch() {
- return create(false);
- }
-
- /**
- * Creates and returns a new test pipeline for streaming execution.
- *
- * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
- * {@link Pipeline#run} to execute the pipeline and check the tests.
- *
- * @return The Test Pipeline
- */
- public static FlinkTestPipeline createForStreaming() {
- return create(true);
- }
-
- /**
- * Creates and returns a new test pipeline for streaming or batch execution.
- *
- * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
- * {@link Pipeline#run} to execute the pipeline and check the tests.
- *
- * @param streaming <code>True</code> for streaming mode, <code>False</code> for batch.
- * @return The Test Pipeline.
- */
- private static FlinkTestPipeline create(boolean streaming) {
- FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest(streaming);
- return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions());
- }
-
- private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner,
- PipelineOptions options) {
- super(runner, options);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/JoinExamplesITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/JoinExamplesITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/JoinExamplesITCase.java
deleted file mode 100644
index f60056d..0000000
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/JoinExamplesITCase.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.runners.flink.util.JoinExamples;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.Joiner;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.util.Arrays;
-import java.util.List;
-
-
-/**
- * Unfortunately we need to copy the code from the Dataflow SDK because it is not public there.
- */
-public class JoinExamplesITCase extends JavaProgramTestBase {
-
- protected String resultPath;
-
- public JoinExamplesITCase(){
- }
-
- private static final TableRow row1 = new TableRow()
- .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212")
- .set("Actor1Name", "BANGKOK").set("SOURCEURL", "http://cnn.com");
- private static final TableRow row2 = new TableRow()
- .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212")
- .set("Actor1Name", "LAOS").set("SOURCEURL", "http://www.chicagotribune.com");
- private static final TableRow row3 = new TableRow()
- .set("ActionGeo_CountryCode", "BE").set("SQLDATE", "20141213")
- .set("Actor1Name", "AFGHANISTAN").set("SOURCEURL", "http://cnn.com");
- static final TableRow[] EVENTS = new TableRow[] {
- row1, row2, row3
- };
- static final List<TableRow> EVENT_ARRAY = Arrays.asList(EVENTS);
-
- private static final TableRow cc1 = new TableRow()
- .set("FIPSCC", "VM").set("HumanName", "Vietnam");
- private static final TableRow cc2 = new TableRow()
- .set("FIPSCC", "BE").set("HumanName", "Belgium");
- static final TableRow[] CCS = new TableRow[] {
- cc1, cc2
- };
- static final List<TableRow> CC_ARRAY = Arrays.asList(CCS);
-
- static final String[] JOINED_EVENTS = new String[] {
- "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: LAOS, "
- + "url: http://www.chicagotribune.com",
- "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: BANGKOK, "
- + "url: http://cnn.com",
- "Country code: BE, Country name: Belgium, Event info: Date: 20141213, Actor1: AFGHANISTAN, "
- + "url: http://cnn.com"
- };
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(JOINED_EVENTS), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
-
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- PCollection<TableRow> input1 = p.apply(Create.of(EVENT_ARRAY));
- PCollection<TableRow> input2 = p.apply(Create.of(CC_ARRAY));
-
- PCollection<String> output = JoinExamples.joinEvents(input1, input2);
-
- output.apply(TextIO.Write.to(resultPath));
-
- p.run();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java
deleted file mode 100644
index 199602c..0000000
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.VoidCoder;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.io.Serializable;
-
-public class MaybeEmptyTestITCase extends JavaProgramTestBase implements Serializable {
-
- protected String resultPath;
-
- protected final String expected = "test";
-
- public MaybeEmptyTestITCase() {
- }
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(expected, resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
-
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- p.apply(Create.of((Void) null)).setCoder(VoidCoder.of())
- .apply(ParDo.of(
- new DoFn<Void, String>() {
- @Override
- public void processElement(DoFn<Void, String>.ProcessContext c) {
- c.output(expected);
- }
- })).apply(TextIO.Write.to(resultPath));
- p.run();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java
deleted file mode 100644
index 403de29..0000000
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.cloud.dataflow.sdk.values.TupleTagList;
-import com.google.common.base.Joiner;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.io.Serializable;
-
-public class ParDoMultiOutputITCase extends JavaProgramTestBase implements Serializable {
-
- private String resultPath;
-
- private static String[] expectedWords = {"MAAA", "MAAFOOO"};
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on("\n").join(expectedWords), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- PCollection<String> words = p.apply(Create.of("Hello", "Whatupmyman", "hey", "SPECIALthere", "MAAA", "MAAFOOO"));
-
- // Select words whose length is below a cut off,
- // plus the lengths of words that are above the cut off.
- // Also select words starting with "MARKER".
- final int wordLengthCutOff = 3;
- // Create tags to use for the main and side outputs.
- final TupleTag<String> wordsBelowCutOffTag = new TupleTag<String>(){};
- final TupleTag<Integer> wordLengthsAboveCutOffTag = new TupleTag<Integer>(){};
- final TupleTag<String> markedWordsTag = new TupleTag<String>(){};
-
- PCollectionTuple results =
- words.apply(ParDo
- .withOutputTags(wordsBelowCutOffTag, TupleTagList.of(wordLengthsAboveCutOffTag)
- .and(markedWordsTag))
- .of(new DoFn<String, String>() {
- final TupleTag<String> specialWordsTag = new TupleTag<String>() {
- };
-
- public void processElement(ProcessContext c) {
- String word = c.element();
- if (word.length() <= wordLengthCutOff) {
- c.output(word);
- } else {
- c.sideOutput(wordLengthsAboveCutOffTag, word.length());
- }
- if (word.startsWith("MAA")) {
- c.sideOutput(markedWordsTag, word);
- }
-
- if (word.startsWith("SPECIAL")) {
- c.sideOutput(specialWordsTag, word);
- }
- }
- }));
-
- // Extract the PCollection results, by tag.
- PCollection<String> wordsBelowCutOff = results.get(wordsBelowCutOffTag);
- PCollection<Integer> wordLengthsAboveCutOff = results.get
- (wordLengthsAboveCutOffTag);
- PCollection<String> markedWords = results.get(markedWordsTag);
-
- markedWords.apply(TextIO.Write.to(resultPath));
-
- p.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
deleted file mode 100644
index 323c41b..0000000
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.io.BoundedSource;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-
-public class ReadSourceITCase extends JavaProgramTestBase {
-
- protected String resultPath;
-
- public ReadSourceITCase(){
- }
-
- static final String[] EXPECTED_RESULT = new String[] {
- "1", "2", "3", "4", "5", "6", "7", "8", "9"};
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- runProgram(resultPath);
- }
-
- private static void runProgram(String resultPath) {
-
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- PCollection<String> result = p
- .apply(Read.from(new ReadSource(1, 10)))
- .apply(ParDo.of(new DoFn<Integer, String>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- c.output(c.element().toString());
- }
- }));
-
- result.apply(TextIO.Write.to(resultPath));
- p.run();
- }
-
-
- private static class ReadSource extends BoundedSource<Integer> {
- final int from;
- final int to;
-
- ReadSource(int from, int to) {
- this.from = from;
- this.to = to;
- }
-
- @Override
- public List<ReadSource> splitIntoBundles(long desiredShardSizeBytes, PipelineOptions options)
- throws Exception {
- List<ReadSource> res = new ArrayList<>();
- FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class);
- int numWorkers = flinkOptions.getParallelism();
- Preconditions.checkArgument(numWorkers > 0, "Number of workers should be larger than 0.");
-
- float step = 1.0f * (to - from) / numWorkers;
- for (int i = 0; i < numWorkers; ++i) {
- res.add(new ReadSource(Math.round(from + i * step), Math.round(from + (i + 1) * step)));
- }
- return res;
- }
-
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
- return 8 * (to - from);
- }
-
- @Override
- public boolean producesSortedKeys(PipelineOptions options) throws Exception {
- return true;
- }
-
- @Override
- public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException {
- return new RangeReader(this);
- }
-
- @Override
- public void validate() {}
-
- @Override
- public Coder<Integer> getDefaultOutputCoder() {
- return BigEndianIntegerCoder.of();
- }
-
- private class RangeReader extends BoundedReader<Integer> {
- private int current;
-
- public RangeReader(ReadSource source) {
- this.current = source.from - 1;
- }
-
- @Override
- public boolean start() throws IOException {
- return true;
- }
-
- @Override
- public boolean advance() throws IOException {
- current++;
- return (current < to);
- }
-
- @Override
- public Integer getCurrent() {
- return current;
- }
-
- @Override
- public void close() throws IOException {
- // Nothing
- }
-
- @Override
- public BoundedSource<Integer> getCurrentSource() {
- return ReadSource.this;
- }
- }
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java
deleted file mode 100644
index 524554a..0000000
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.Joiner;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.util.Collections;
-import java.util.List;
-
-
-public class RemoveDuplicatesEmptyITCase extends JavaProgramTestBase {
-
- protected String resultPath;
-
- public RemoveDuplicatesEmptyITCase(){
- }
-
- static final String[] EXPECTED_RESULT = new String[] {};
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
-
- List<String> strings = Collections.emptyList();
-
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- PCollection<String> input =
- p.apply(Create.of(strings))
- .setCoder(StringUtf8Coder.of());
-
- PCollection<String> output =
- input.apply(RemoveDuplicates.<String>create());
-
- output.apply(TextIO.Write.to(resultPath));
- p.run();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java
deleted file mode 100644
index 54e92aa..0000000
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.Joiner;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.util.Arrays;
-import java.util.List;
-
-
-public class RemoveDuplicatesITCase extends JavaProgramTestBase {
-
- protected String resultPath;
-
- public RemoveDuplicatesITCase(){
- }
-
- static final String[] EXPECTED_RESULT = new String[] {
- "k1", "k5", "k2", "k3"};
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
-
- List<String> strings = Arrays.asList("k1", "k5", "k5", "k2", "k1", "k2", "k3");
-
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- PCollection<String> input =
- p.apply(Create.of(strings))
- .setCoder(StringUtf8Coder.of());
-
- PCollection<String> output =
- input.apply(RemoveDuplicates.<String>create());
-
- output.apply(TextIO.Write.to(resultPath));
- p.run();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java
deleted file mode 100644
index 7f73b83..0000000
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.io.Serializable;
-
-public class SideInputITCase extends JavaProgramTestBase implements Serializable {
-
- private static final String expected = "Hello!";
-
- protected String resultPath;
-
- @Override
- protected void testProgram() throws Exception {
-
-
- Pipeline p = FlinkTestPipeline.createForBatch();
-
-
- final PCollectionView<String> sidesInput = p
- .apply(Create.of(expected))
- .apply(View.<String>asSingleton());
-
- p.apply(Create.of("bli"))
- .apply(ParDo.of(new DoFn<String, String>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- String s = c.sideInput(sidesInput);
- c.output(s);
- }
- }).withSideInputs(sidesInput)).apply(TextIO.Write.to(resultPath));
-
- p.run();
- }
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(expected, resultPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java
deleted file mode 100644
index 8722fee..0000000
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.runners.flink.examples.TFIDF;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.Keys;
-import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.Joiner;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.net.URI;
-
-
-public class TfIdfITCase extends JavaProgramTestBase {
-
- protected String resultPath;
-
- public TfIdfITCase(){
- }
-
- static final String[] EXPECTED_RESULT = new String[] {
- "a", "m", "n", "b", "c", "d"};
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
-
- Pipeline pipeline = FlinkTestPipeline.createForBatch();
-
- pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
-
- PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline
- .apply(Create.of(
- KV.of(new URI("x"), "a b c d"),
- KV.of(new URI("y"), "a b c"),
- KV.of(new URI("z"), "a m n")))
- .apply(new TFIDF.ComputeTfIdf());
-
- PCollection<String> words = wordToUriAndTfIdf
- .apply(Keys.<String>create())
- .apply(RemoveDuplicates.<String>create());
-
- words.apply(TextIO.Write.to(resultPath));
-
- pipeline.run();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java
deleted file mode 100644
index 8ca978e..0000000
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.runners.flink.examples.WordCount;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.MapElements;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.Joiner;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.util.Arrays;
-import java.util.List;
-
-
-public class WordCountITCase extends JavaProgramTestBase {
-
- protected String resultPath;
-
- public WordCountITCase(){
- }
-
- static final String[] WORDS_ARRAY = new String[] {
- "hi there", "hi", "hi sue bob",
- "hi sue", "", "bob hi"};
-
- static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
-
- static final String[] COUNTS_ARRAY = new String[] {
- "hi: 5", "there: 1", "sue: 2", "bob: 2"};
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(COUNTS_ARRAY), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
-
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
-
- input
- .apply(new WordCount.CountWords())
- .apply(MapElements.via(new WordCount.FormatAsTextFn()))
- .apply(TextIO.Write.to(resultPath));
-
- p.run();
- }
-}
-