You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:00 UTC
[05/50] [abbrv] incubator-beam git commit: [runner] add streaming
support with checkpointing
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
new file mode 100644
index 0000000..0a0e301
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming;
+
+import com.dataartisans.flink.dataflow.translation.types.CoderTypeInformation;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.util.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+
+public class FlinkGroupByKeyWrapper {
+
+ /**
+ * Just an auxiliary interface to bypass the fact that java anonymous classes cannot implement
+ * multiple interfaces.
+ */
+ private interface KeySelectorWithQueryableResultType<K, V> extends KeySelector<WindowedValue<KV<K, V>>, K>, ResultTypeQueryable<K> {
+ }
+
+ public static <K, V> KeyedStream<WindowedValue<KV<K, V>>, K> groupStreamByKey(DataStream<WindowedValue<KV<K, V>>> inputDataStream, KvCoder<K, V> inputKvCoder) {
+ final Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+ final TypeInformation<K> keyTypeInfo = new CoderTypeInformation<>(keyCoder);
+
+ return inputDataStream.keyBy(
+ new KeySelectorWithQueryableResultType<K, V>() {
+
+ @Override
+ public K getKey(WindowedValue<KV<K, V>> value) throws Exception {
+ return value.getValue().getKey();
+ }
+
+ @Override
+ public TypeInformation<K> getProducedType() {
+ return keyTypeInfo;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
new file mode 100644
index 0000000..200c397
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming;
+
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.util.WindowingInternals;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+import java.util.Map;
+
+public class FlinkParDoBoundMultiWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, RawUnionValue> {
+
+ private final TupleTag<?> mainTag;
+ private final Map<TupleTag<?>, Integer> outputLabels;
+
+ public FlinkParDoBoundMultiWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn, TupleTag<?> mainTag, Map<TupleTag<?>, Integer> tagsToLabels) {
+ super(options, windowingStrategy, doFn);
+ this.mainTag = Preconditions.checkNotNull(mainTag);
+ this.outputLabels = Preconditions.checkNotNull(tagsToLabels);
+ }
+
+ @Override
+ public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector) {
+ checkTimestamp(inElement, timestamp);
+ Integer index = outputLabels.get(mainTag);
+ collector.collect(makeWindowedValue(
+ new RawUnionValue(index, output),
+ timestamp,
+ inElement.getWindows(),
+ inElement.getPane()));
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector, TupleTag<T> tag) {
+ checkTimestamp(inElement, timestamp);
+ Integer index = outputLabels.get(tag);
+ if (index != null) {
+ collector.collect(makeWindowedValue(
+ new RawUnionValue(index, output),
+ timestamp,
+ inElement.getWindows(),
+ inElement.getPane()));
+ }
+ }
+
+ @Override
+ public WindowingInternals<IN, OUT> windowingInternalsHelper(WindowedValue<IN> inElement, Collector<WindowedValue<RawUnionValue>> outCollector) {
+ throw new RuntimeException("FlinkParDoBoundMultiWrapper is just an internal operator serving as " +
+ "an intermediate transformation for the ParDo.BoundMulti translation. windowingInternals() " +
+ "is not available in this class.");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
new file mode 100644
index 0000000..18d4249
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.util.*;
+import com.google.cloud.dataflow.sdk.util.state.StateInternals;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.util.*;
+
+public class FlinkParDoBoundWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, OUT> {
+
+ public FlinkParDoBoundWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn) {
+ super(options, windowingStrategy, doFn);
+ }
+
+ @Override
+ public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<OUT>> collector) {
+ checkTimestamp(inElement, timestamp);
+ collector.collect(makeWindowedValue(
+ output,
+ timestamp,
+ inElement.getWindows(),
+ inElement.getPane()));
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<OUT>> outCollector, TupleTag<T> tag) {
+ // ignore the side output, this can happen when a user does not register
+ // side outputs but then outputs using a freshly created TupleTag.
+ throw new RuntimeException("sideOutput() not not available in ParDo.Bound().");
+ }
+
+ @Override
+ public WindowingInternals<IN, OUT> windowingInternalsHelper(final WindowedValue<IN> inElement, final Collector<WindowedValue<OUT>> collector) {
+ return new WindowingInternals<IN, OUT>() {
+ @Override
+ public StateInternals stateInternals() {
+ throw new NullPointerException("StateInternals are not available for ParDo.Bound().");
+ }
+
+ @Override
+ public void outputWindowedValue(OUT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+ collector.collect(makeWindowedValue(output, timestamp, windows, pane));
+ }
+
+ @Override
+ public TimerInternals timerInternals() {
+ throw new NullPointerException("TimeInternals are not available for ParDo.Bound().");
+ }
+
+ @Override
+ public Collection<? extends BoundedWindow> windows() {
+ return inElement.getWindows();
+ }
+
+ @Override
+ public PaneInfo pane() {
+ return inElement.getPane();
+ }
+
+ @Override
+ public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
+ throw new RuntimeException("writePCollectionViewData() not supported in Streaming mode.");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java
new file mode 100644
index 0000000..17e0746
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming.io;
+
+import com.dataartisans.flink.dataflow.FlinkPipelineRunner;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+public class UnboundedFlinkSource<T, C extends UnboundedSource.CheckpointMark> extends UnboundedSource<T, C> {
+
+ private final PipelineOptions options;
+ private final RichParallelSourceFunction<T> flinkSource;
+
+ public UnboundedFlinkSource(PipelineOptions pipelineOptions, RichParallelSourceFunction<T> source) {
+ if(!pipelineOptions.getRunner().equals(FlinkPipelineRunner.class)) {
+ throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+ }
+ options = Preconditions.checkNotNull(pipelineOptions);
+ flinkSource = Preconditions.checkNotNull(source);
+ validate();
+ }
+
+ public RichParallelSourceFunction<T> getFlinkSource() {
+ return this.flinkSource;
+ }
+
+ @Override
+ public List<? extends UnboundedSource<T, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
+ throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+ }
+
+ @Override
+ public UnboundedReader<T> createReader(PipelineOptions options, @Nullable C checkpointMark) {
+ throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+ }
+
+ @Nullable
+ @Override
+ public Coder<C> getCheckpointMarkCoder() {
+ throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+ }
+
+
+ @Override
+ public void validate() {
+ Preconditions.checkNotNull(options);
+ Preconditions.checkNotNull(flinkSource);
+ if(!options.getRunner().equals(FlinkPipelineRunner.class)) {
+ throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+ }
+ }
+
+ @Override
+ public Coder<T> getDefaultOutputCoder() {
+ throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java
new file mode 100644
index 0000000..2b0d6dc
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming.io;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> extends UnboundedSource<String, C> {
+
+ private static final Coder<String> DEFAULT_SOCKET_CODER = StringUtf8Coder.of();
+
+ private static final long serialVersionUID = 1L;
+
+ private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 500;
+
+ private static final int CONNECTION_TIMEOUT_TIME = 0;
+
+ private final String hostname;
+ private final int port;
+ private final char delimiter;
+ private final long maxNumRetries;
+ private final long delayBetweenRetries;
+
+ public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries) {
+ this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP);
+ }
+
+ public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries, long delayBetweenRetries) {
+ this.hostname = hostname;
+ this.port = port;
+ this.delimiter = delimiter;
+ this.maxNumRetries = maxNumRetries;
+ this.delayBetweenRetries = delayBetweenRetries;
+ }
+
+ public String getHostname() {
+ return this.hostname;
+ }
+
+ public int getPort() {
+ return this.port;
+ }
+
+ public char getDelimiter() {
+ return this.delimiter;
+ }
+
+ public long getMaxNumRetries() {
+ return this.maxNumRetries;
+ }
+
+ public long getDelayBetweenRetries() {
+ return this.delayBetweenRetries;
+ }
+
+ @Override
+ public List<? extends UnboundedSource<String, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
+ return Collections.<UnboundedSource<String, C>>singletonList(this);
+ }
+
+ @Override
+ public UnboundedReader<String> createReader(PipelineOptions options, @Nullable C checkpointMark) {
+ return new UnboundedSocketReader(this);
+ }
+
+ @Nullable
+ @Override
+ public Coder getCheckpointMarkCoder() {
+ // Flink and Dataflow have different checkpointing mechanisms.
+ // In our case we do not need a coder.
+ return null;
+ }
+
+ @Override
+ public void validate() {
+ checkArgument(port > 0 && port < 65536, "port is out of range");
+ checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
+ checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive");
+ }
+
+ @Override
+ public Coder getDefaultOutputCoder() {
+ return DEFAULT_SOCKET_CODER;
+ }
+
+ public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String> implements Serializable {
+
+ private static final long serialVersionUID = 7526472295622776147L;
+ private static final Logger LOG = LoggerFactory.getLogger(UnboundedSocketReader.class);
+
+ private final UnboundedSocketSource source;
+
+ private Socket socket;
+ private BufferedReader reader;
+
+ private boolean isRunning;
+
+ private String currentRecord;
+
+ public UnboundedSocketReader(UnboundedSocketSource source) {
+ this.source = source;
+ }
+
+ private void openConnection() throws IOException {
+ this.socket = new Socket();
+ this.socket.connect(new InetSocketAddress(this.source.getHostname(), this.source.getPort()), CONNECTION_TIMEOUT_TIME);
+ this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
+ this.isRunning = true;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ int attempt = 0;
+ while (!isRunning) {
+ try {
+ openConnection();
+ LOG.info("Connected to server socket " + this.source.getHostname() + ':' + this.source.getPort());
+
+ return advance();
+ } catch (IOException e) {
+ LOG.info("Lost connection to server socket " + this.source.getHostname() + ':' + this.source.getPort() + ". Retrying in " + this.source.getDelayBetweenRetries() + " msecs...");
+
+ if (this.source.getMaxNumRetries() == -1 || attempt++ < this.source.getMaxNumRetries()) {
+ try {
+ Thread.sleep(this.source.getDelayBetweenRetries());
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ }
+ } else {
+ this.isRunning = false;
+ break;
+ }
+ }
+ }
+ LOG.error("Unable to connect to host " + this.source.getHostname() + " : " + this.source.getPort());
+ return false;
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ final StringBuilder buffer = new StringBuilder();
+ int data;
+ while (isRunning && (data = reader.read()) != -1) {
+ // check if the string is complete
+ if (data != this.source.getDelimiter()) {
+ buffer.append((char) data);
+ } else {
+ if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\r') {
+ buffer.setLength(buffer.length() - 1);
+ }
+ this.currentRecord = buffer.toString();
+ buffer.setLength(0);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public byte[] getCurrentRecordId() throws NoSuchElementException {
+ return new byte[0];
+ }
+
+ @Override
+ public String getCurrent() throws NoSuchElementException {
+ return this.currentRecord;
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ return Instant.now();
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.reader.close();
+ this.socket.close();
+ this.isRunning = false;
+ LOG.info("Closed connection to server socket at " + this.source.getHostname() + ":" + this.source.getPort() + ".");
+ }
+
+ @Override
+ public Instant getWatermark() {
+ return Instant.now();
+ }
+
+ @Override
+ public CheckpointMark getCheckpointMark() {
+ return null;
+ }
+
+ @Override
+ public UnboundedSource<String, ?> getCurrentSource() {
+ return this.source;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
new file mode 100644
index 0000000..3e248a6
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming.io;
+
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.joda.time.Instant;
+
+import java.util.Collection;
+
+public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<T>> implements EventTimeSourceFunction<WindowedValue<T>>, Triggerable {
+
+ private final String name;
+ private final UnboundedSource.UnboundedReader<T> reader;
+
+ private StreamingRuntimeContext runtime = null;
+ private StreamSource.ManualWatermarkContext<T> context = null;
+
+ private volatile boolean isRunning = false;
+
+ public UnboundedSourceWrapper(PipelineOptions options, Read.Unbounded<T> transform) {
+ this.name = transform.getName();
+ this.reader = transform.getSource().createReader(options, null);
+ }
+
+ public String getName() {
+ return this.name;
+ }
+
+ WindowedValue<T> makeWindowedValue(
+ T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+ if (timestamp == null) {
+ timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+ return WindowedValue.of(output, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+ }
+
+ @Override
+ public void run(SourceContext<WindowedValue<T>> ctx) throws Exception {
+ if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
+ throw new RuntimeException("We assume that all sources in Dataflow are EventTimeSourceFunction. " +
+ "Apparently " + this.name + " is not. Probably you should consider writing your own Wrapper for this source.");
+ }
+
+ context = (StreamSource.ManualWatermarkContext<T>) ctx;
+ runtime = (StreamingRuntimeContext) getRuntimeContext();
+
+ this.isRunning = reader.start();
+ setNextWatermarkTimer(this.runtime);
+
+ while (isRunning) {
+
+ // get it and its timestamp from the source
+ T item = reader.getCurrent();
+ Instant timestamp = reader.getCurrentTimestamp();
+
+ long milliseconds = timestamp.getMillis();
+
+ // write it to the output collector
+ synchronized (ctx.getCheckpointLock()) {
+ ctx.collectWithTimestamp(makeWindowedValue(item, timestamp, null, PaneInfo.NO_FIRING), milliseconds);
+ }
+
+ // try to go to the next record
+ this.isRunning = reader.advance();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+ @Override
+ public void trigger(long timestamp) throws Exception {
+ if (this.isRunning) {
+ synchronized (context.getCheckpointLock()) {
+ long watermarkMillis = this.reader.getWatermark().getMillis();
+ context.emitWatermark(new Watermark(watermarkMillis));
+ }
+ setNextWatermarkTimer(this.runtime);
+ }
+ }
+
+ private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
+ if (this.isRunning) {
+ long watermarkInterval = runtime.getExecutionConfig().getAutoWatermarkInterval();
+ long timeToNextWatermark = getTimeToNextWaternark(watermarkInterval);
+ runtime.registerTimer(timeToNextWatermark, this);
+ }
+ }
+
+ private long getTimeToNextWaternark(long watermarkInterval) {
+ return System.currentTimeMillis() + watermarkInterval;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
new file mode 100644
index 0000000..4401eb3
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming.state;
+
+import com.dataartisans.flink.dataflow.translation.types.CoderTypeSerializer;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.util.TimerInternals;
+import com.google.cloud.dataflow.sdk.util.TimerOrElement;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.KV;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public abstract class AbstractFlinkTimerInternals<K, VIN> implements TimerInternals, Serializable {
+
+ private TimerOrElement<WindowedValue<KV<K, VIN>>> element;
+
+ private Instant currentWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+ public TimerOrElement<WindowedValue<KV<K, VIN>>> getElement() {
+ return this.element;
+ }
+
+ public void setElement(TimerOrElement<WindowedValue<KV<K, VIN>>> value) {
+ this.element = value;
+ }
+
+ public void setCurrentWatermark(Instant watermark) {
+ checkIfValidWatermark(watermark);
+ this.currentWatermark = watermark;
+ }
+
+ private void setCurrentWatermarkAfterRecovery(Instant watermark) {
+ if(!currentWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
+ throw new RuntimeException("Explicitly setting the watermark is only allowed on " +
+ "initialization after recovery from a node failure. Apparently this is not " +
+ "the case here as the watermark is already set.");
+ }
+ this.currentWatermark = watermark;
+ }
+
+ @Override
+ public void setTimer(com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData timerKey) {
+ K key = element.isTimer() ? (K) element.key() : element.element().getValue().getKey();
+ registerTimer(key, timerKey);
+ }
+
+ protected abstract void registerTimer(K key, TimerData timerKey);
+
+ @Override
+ public void deleteTimer(com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData timerKey) {
+ K key = element.isTimer() ? (K) element.key() : element.element().getValue().getKey();
+ unregisterTimer(key, timerKey);
+ }
+
+ protected abstract void unregisterTimer(K key, TimerData timerKey);
+
+ @Override
+ public Instant currentProcessingTime() {
+ return Instant.now();
+ }
+
+ @Override
+ public Instant currentWatermarkTime() {
+ return this.currentWatermark;
+ }
+
+ private void checkIfValidWatermark(Instant newWatermark) {
+ if (currentWatermark.isAfter(newWatermark)) {
+ throw new IllegalArgumentException(String.format(
+ "Cannot set current watermark to %s. Newer watermarks " +
+ "must be no earlier than the current one (%s).",
+ newWatermark, this.currentWatermark));
+ }
+ }
+
+ public void encodeTimerInternals(DoFn.ProcessContext context,
+ StateCheckpointWriter writer,
+ KvCoder<K, VIN> kvCoder,
+ Coder<? extends BoundedWindow> windowCoder) throws IOException {
+ if (context == null) {
+ throw new RuntimeException("The Context has not been initialized.");
+ }
+
+ if (element != null && !element.isTimer()) {
+ // create the element coder
+ WindowedValue.WindowedValueCoder<KV<K, VIN>> elementCoder = WindowedValue
+ .getFullCoder(kvCoder, windowCoder);
+
+ CoderTypeSerializer<WindowedValue<KV<K, VIN>>> serializer =
+ new CoderTypeSerializer<>(elementCoder);
+
+ writer.writeByte((byte) 1);
+ writer.serializeObject(element.element(), serializer);
+ } else {
+ // just setting a flag to 0, meaning that there is no value.
+ writer.writeByte((byte) 0);
+ }
+ writer.setTimestamp(currentWatermark);
+ }
+
+ public void restoreTimerInternals(StateCheckpointReader reader,
+ KvCoder<K, VIN> kvCoder,
+ Coder<? extends BoundedWindow> windowCoder) throws IOException {
+
+ boolean isSet = (reader.getByte() == (byte) 1);
+ if (!isSet) {
+ this.element = null;
+ } else {
+ WindowedValue.WindowedValueCoder<KV<K, VIN>> elementCoder = WindowedValue
+ .getFullCoder(kvCoder, windowCoder);
+
+ CoderTypeSerializer<WindowedValue<KV<K, VIN>>> serializer =
+ new CoderTypeSerializer<>(elementCoder);
+
+ WindowedValue<KV<K, VIN>> elem = reader.deserializeObject(serializer);
+ this.element = TimerOrElement.element(elem);
+ }
+ setCurrentWatermarkAfterRecovery(reader.getTimestamp());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java
new file mode 100644
index 0000000..03b8bb5
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -0,0 +1,533 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming.state;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.util.state.*;
+import com.google.protobuf.ByteString;
+import org.apache.flink.util.InstantiationUtil;
+import org.joda.time.Instant;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.*;
+
+public class FlinkStateInternals<K> extends MergingStateInternals {
+
+ private final K key;
+
+ private final Coder<K> keyCoder;
+
+ private final Combine.KeyedCombineFn<K, ?, ?, ?> combineFn;
+
+ private final Coder<? extends BoundedWindow> windowCoder;
+
+ private Instant watermarkHoldAccessor;
+
+ public FlinkStateInternals(K key,
+ Coder<K> keyCoder,
+ Coder<? extends BoundedWindow> windowCoder,
+ Combine.KeyedCombineFn<K, ?, ?, ?> combineFn) {
+ this.key = key;
+ this.combineFn = combineFn;
+ this.windowCoder = windowCoder;
+ this.keyCoder = keyCoder;
+ }
+
+ public Instant getWatermarkHold() {
+ return watermarkHoldAccessor;
+ }
+
+ /**
+ * This is the interface state has to implement in order for it to be fault tolerant when
+ * executed by the FlinkPipelineRunner.
+ */
+ private interface CheckpointableIF {
+
+ boolean shouldPersist();
+
+ void persistState(StateCheckpointWriter checkpointBuilder) throws IOException;
+ }
+
+ protected final StateTable inMemoryState = new StateTable() {
+
+ @Override
+ protected StateTag.StateBinder binderForNamespace(final StateNamespace namespace) {
+ return new StateTag.StateBinder() {
+
+ @Override
+ public <T> ValueState<T> bindValue(StateTag<ValueState<T>> address, Coder<T> coder) {
+ return new FlinkInMemoryValue<>(encodeKey(namespace, address), coder);
+ }
+
+ @Override
+ public <T> BagState<T> bindBag(StateTag<BagState<T>> address, Coder<T> elemCoder) {
+ return new FlinkInMemoryBag<>(encodeKey(namespace, address), elemCoder);
+ }
+
+ @Override
+ public <InputT, AccumT, OutputT> CombiningValueStateInternal<InputT, AccumT, OutputT> bindCombiningValue(
+ StateTag<CombiningValueStateInternal<InputT, AccumT, OutputT>> address,
+ Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+ return new FlinkInMemoryCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder);
+ }
+
+ @Override
+ public <T> WatermarkStateInternal bindWatermark(StateTag<WatermarkStateInternal> address) {
+ return new FlinkWatermarkStateInternalImpl(encodeKey(namespace, address));
+ }
+ };
+ }
+ };
+
+ @Override
+ public <T extends State> T state(StateNamespace namespace, StateTag<T> address) {
+ return inMemoryState.get(namespace, address);
+ }
+
+ public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
+ checkpointBuilder.writeInt(getNoOfElements());
+
+ for (State location : inMemoryState.values()) {
+ if (!(location instanceof CheckpointableIF)) {
+ throw new IllegalStateException(String.format(
+ "%s wasn't created by %s -- unable to persist it",
+ location.getClass().getSimpleName(),
+ getClass().getSimpleName()));
+ }
+ ((CheckpointableIF) location).persistState(checkpointBuilder);
+ }
+ }
+
+ public void restoreState(StateCheckpointReader checkpointReader, ClassLoader loader)
+ throws IOException, ClassNotFoundException {
+
+ // the number of elements to read.
+ int noOfElements = checkpointReader.getInt();
+ for (int i = 0; i < noOfElements; i++) {
+ decodeState(checkpointReader, loader);
+ }
+ }
+
+ /**
+ * We remove the first character which encodes the type of the stateTag ('s' for system
+ * and 'u' for user). For more details check out the source of
+ * {@link StateTags.StateTagBase#getId()}.
+ */
+ private void decodeState(StateCheckpointReader reader, ClassLoader loader)
+ throws IOException, ClassNotFoundException {
+
+ StateType stateItemType = StateType.deserialize(reader);
+ ByteString stateKey = reader.getTag();
+
+ // first decode the namespace and the tagId...
+ String[] namespaceAndTag = stateKey.toStringUtf8().split("\\+");
+ if (namespaceAndTag.length != 2) {
+ throw new IllegalArgumentException("Invalid stateKey " + stateKey.toString() + ".");
+ }
+ StateNamespace namespace = StateNamespaces.fromString(namespaceAndTag[0], windowCoder);
+
+ // ... decide if it is a system or user stateTag...
+ char ownerTag = namespaceAndTag[1].charAt(0);
+ if (ownerTag != 's' && ownerTag != 'u') {
+ throw new RuntimeException("Invalid StateTag name.");
+ }
+ boolean isSystemTag = ownerTag == 's';
+ String tagId = namespaceAndTag[1].substring(1);
+
+ // ...then decode the coder (if there is one)...
+ Coder coder = null;
+ if (!stateItemType.equals(StateType.WATERMARK)) {
+ ByteString coderBytes = reader.getData();
+ coder = InstantiationUtil.deserializeObject(coderBytes.toByteArray(), loader);
+ }
+
+ //... and finally, depending on the type of the state being decoded,
+ // 1) create the adequate stateTag,
+ // 2) create the state container,
+ // 3) restore the actual content.
+ switch (stateItemType) {
+ case VALUE: {
+ StateTag stateTag = StateTags.value(tagId, coder);
+ stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
+ FlinkInMemoryValue<?> value = (FlinkInMemoryValue<?>) inMemoryState.get(namespace, stateTag);
+ value.restoreState(reader);
+ break;
+ }
+ case WATERMARK: {
+ StateTag<WatermarkStateInternal> stateTag = StateTags.watermarkStateInternal(tagId);
+ stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
+ FlinkWatermarkStateInternalImpl watermark = (FlinkWatermarkStateInternalImpl) inMemoryState.get(namespace, stateTag);
+ watermark.restoreState(reader);
+ break;
+ }
+ case LIST: {
+ StateTag stateTag = StateTags.bag(tagId, coder);
+ stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
+ FlinkInMemoryBag<?> bag = (FlinkInMemoryBag<?>) inMemoryState.get(namespace, stateTag);
+ bag.restoreState(reader);
+ break;
+ }
+ case ACCUMULATOR: {
+ StateTag stateTag = StateTags.combiningValue(tagId, coder, combineFn.forKey(this.key, keyCoder));
+ stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
+ FlinkInMemoryCombiningValue<?, ?, ?> combiningValue = (FlinkInMemoryCombiningValue<?, ?, ?>) inMemoryState.get(namespace, stateTag);
+ combiningValue.restoreState(reader);
+ break;
+ }
+ default:
+ throw new RuntimeException("Unknown State Type " + stateItemType + ".");
+ }
+ }
+
+ private ByteString encodeKey(StateNamespace namespace, StateTag<?> address) {
+ return ByteString.copyFromUtf8(namespace.stringKey() + "+" + address.getId());
+ }
+
+ private int getNoOfElements() {
+ int noOfElements = 0;
+ for (State state : inMemoryState.values()) {
+ if (!(state instanceof CheckpointableIF)) {
+ throw new RuntimeException("State Implementations used by the " +
+ "Flink Dataflow Runner should implement the CheckpointableIF interface.");
+ }
+
+ if (((CheckpointableIF) state).shouldPersist()) {
+ noOfElements++;
+ }
+ }
+ return noOfElements;
+ }
+
+ private final class FlinkInMemoryValue<T> implements ValueState<T>, CheckpointableIF {
+
+ private final ByteString stateKey;
+ private final Coder<T> elemCoder;
+
+ private T value = null;
+
+ public FlinkInMemoryValue(ByteString stateKey, Coder<T> elemCoder) {
+ this.stateKey = stateKey;
+ this.elemCoder = elemCoder;
+ }
+
+ @Override
+ public void clear() {
+ value = null;
+ }
+
+ @Override
+ public StateContents<T> get() {
+ return new StateContents<T>() {
+ @Override
+ public T read() {
+ return value;
+ }
+ };
+ }
+
+ @Override
+ public void set(T input) {
+ this.value = input;
+ }
+
+ @Override
+ public boolean shouldPersist() {
+ return value != null;
+ }
+
+ @Override
+ public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
+ if (value != null) {
+
+ // serialize the coder.
+ byte[] coder = InstantiationUtil.serializeObject(elemCoder);
+
+ // encode the value into a ByteString
+ ByteString.Output stream = ByteString.newOutput();
+ elemCoder.encode(value, stream, Coder.Context.OUTER);
+ ByteString data = stream.toByteString();
+
+ checkpointBuilder.addValueBuilder()
+ .setTag(stateKey)
+ .setData(coder)
+ .setData(data);
+ }
+ }
+
+ public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
+ ByteString valueContent = checkpointReader.getData();
+ T outValue = elemCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
+ set(outValue);
+ }
+ }
+
+ private final class FlinkWatermarkStateInternalImpl
+ implements WatermarkStateInternal, CheckpointableIF {
+
+ private final ByteString stateKey;
+
+ private Instant minimumHold = null;
+
+ public FlinkWatermarkStateInternalImpl(ByteString stateKey) {
+ this.stateKey = stateKey;
+ }
+
+ @Override
+ public void clear() {
+ // Even though we're clearing we can't remove this from the in-memory state map, since
+ // other users may already have a handle on this WatermarkBagInternal.
+ minimumHold = null;
+ watermarkHoldAccessor = null;
+ }
+
+ @Override
+ public StateContents<Instant> get() {
+ return new StateContents<Instant>() {
+ @Override
+ public Instant read() {
+ return minimumHold;
+ }
+ };
+ }
+
+ @Override
+ public void add(Instant watermarkHold) {
+ if (minimumHold == null || minimumHold.isAfter(watermarkHold)) {
+ watermarkHoldAccessor = watermarkHold;
+ minimumHold = watermarkHold;
+ }
+ }
+
+ @Override
+ public StateContents<Boolean> isEmpty() {
+ return new StateContents<Boolean>() {
+ @Override
+ public Boolean read() {
+ return minimumHold == null;
+ }
+ };
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toString(minimumHold);
+ }
+
+ @Override
+ public boolean shouldPersist() {
+ return minimumHold != null;
+ }
+
+ @Override
+ public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
+ if (minimumHold != null) {
+ checkpointBuilder.addWatermarkHoldsBuilder()
+ .setTag(stateKey)
+ .setTimestamp(minimumHold);
+ }
+ }
+
+ public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
+ Instant watermark = checkpointReader.getTimestamp();
+ add(watermark);
+ }
+ }
+
+ private final class FlinkInMemoryCombiningValue<InputT, AccumT, OutputT>
+ implements CombiningValueStateInternal<InputT, AccumT, OutputT>, CheckpointableIF {
+
+ private final ByteString stateKey;
+ private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
+ private final Coder<AccumT> accumCoder;
+
+ private AccumT accum;
+ private boolean isCleared = true;
+
+ private FlinkInMemoryCombiningValue(ByteString stateKey,
+ Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
+ Coder<AccumT> accumCoder) {
+ Preconditions.checkNotNull(combineFn);
+ Preconditions.checkNotNull(accumCoder);
+
+ this.stateKey = stateKey;
+ this.combineFn = combineFn;
+ this.accumCoder = accumCoder;
+ accum = combineFn.createAccumulator();
+ }
+
+ @Override
+ public void clear() {
+ accum = combineFn.createAccumulator();
+ isCleared = true;
+ }
+
+ @Override
+ public StateContents<OutputT> get() {
+ return new StateContents<OutputT>() {
+ @Override
+ public OutputT read() {
+ return combineFn.extractOutput(accum);
+ }
+ };
+ }
+
+ @Override
+ public void add(InputT input) {
+ isCleared = false;
+ accum = combineFn.addInput(accum, input);
+ }
+
+ @Override
+ public StateContents<AccumT> getAccum() {
+ return new StateContents<AccumT>() {
+ @Override
+ public AccumT read() {
+ return accum;
+ }
+ };
+ }
+
+ @Override
+ public StateContents<Boolean> isEmpty() {
+ return new StateContents<Boolean>() {
+ @Override
+ public Boolean read() {
+ return isCleared;
+ }
+ };
+ }
+
+ @Override
+ public void addAccum(AccumT accum) {
+ isCleared = false;
+ this.accum = combineFn.mergeAccumulators(Arrays.asList(this.accum, accum));
+ }
+
+ @Override
+ public boolean shouldPersist() {
+ return accum != null;
+ }
+
+ @Override
+ public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
+ if (accum != null) {
+
+ // serialize the coder.
+ byte[] coder = InstantiationUtil.serializeObject(accumCoder);
+
+ // encode the accumulator into a ByteString
+ ByteString.Output stream = ByteString.newOutput();
+ accumCoder.encode(accum, stream, Coder.Context.OUTER);
+ ByteString data = stream.toByteString();
+
+ // put the flag that the next serialized element is an accumulator
+ checkpointBuilder.addAccumulatorBuilder()
+ .setTag(stateKey)
+ .setData(coder)
+ .setData(data);
+ }
+ }
+
+ public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
+ ByteString valueContent = checkpointReader.getData();
+ AccumT accum = this.accumCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
+ addAccum(accum);
+ }
+ }
+
+ private static final class FlinkInMemoryBag<T> implements BagState<T>, CheckpointableIF {
+ private final List<T> contents = new ArrayList<>();
+
+ private final ByteString stateKey;
+ private final Coder<T> elemCoder;
+
+ public FlinkInMemoryBag(ByteString stateKey, Coder<T> elemCoder) {
+ this.stateKey = stateKey;
+ this.elemCoder = elemCoder;
+ }
+
+ @Override
+ public void clear() {
+ contents.clear();
+ }
+
+ @Override
+ public StateContents<Iterable<T>> get() {
+ return new StateContents<Iterable<T>>() {
+ @Override
+ public Iterable<T> read() {
+ return contents;
+ }
+ };
+ }
+
+ @Override
+ public void add(T input) {
+ contents.add(input);
+ }
+
+ @Override
+ public StateContents<Boolean> isEmpty() {
+ return new StateContents<Boolean>() {
+ @Override
+ public Boolean read() {
+ return contents.isEmpty();
+ }
+ };
+ }
+
+ @Override
+ public boolean shouldPersist() {
+ return !contents.isEmpty();
+ }
+
+ @Override
+ public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
+ if (!contents.isEmpty()) {
+ // serialize the coder.
+ byte[] coder = InstantiationUtil.serializeObject(elemCoder);
+
+ checkpointBuilder.addListUpdatesBuilder()
+ .setTag(stateKey)
+ .setData(coder)
+ .writeInt(contents.size());
+
+ for (T item : contents) {
+ // encode the element
+ ByteString.Output stream = ByteString.newOutput();
+ elemCoder.encode(item, stream, Coder.Context.OUTER);
+ ByteString data = stream.toByteString();
+
+ // add the data to the checkpoint.
+ checkpointBuilder.setData(data);
+ }
+ }
+ }
+
+ public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
+ int noOfValues = checkpointReader.getInt();
+ for (int j = 0; j < noOfValues; j++) {
+ ByteString valueContent = checkpointReader.getData();
+ T outValue = elemCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
+ add(outValue);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java
new file mode 100644
index 0000000..ba8ef89
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming.state;
+
+import com.dataartisans.flink.dataflow.translation.types.CoderTypeSerializer;
+import com.google.protobuf.ByteString;
+import org.apache.flink.core.memory.DataInputView;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public class StateCheckpointReader {
+
+ private final DataInputView input;
+
+ public StateCheckpointReader(DataInputView in) {
+ this.input = in;
+ }
+
+ public ByteString getTag() throws IOException {
+ return ByteString.copyFrom(readRawData());
+ }
+
+ public String getTagToString() throws IOException {
+ return input.readUTF();
+ }
+
+ public ByteString getData() throws IOException {
+ return ByteString.copyFrom(readRawData());
+ }
+
+ public int getInt() throws IOException {
+ validate();
+ return input.readInt();
+ }
+
+ public byte getByte() throws IOException {
+ validate();
+ return input.readByte();
+ }
+
+ public Instant getTimestamp() throws IOException {
+ validate();
+ Long watermarkMillis = input.readLong();
+ return new Instant(TimeUnit.MICROSECONDS.toMillis(watermarkMillis));
+ }
+
+ public <K> K deserializeKey(CoderTypeSerializer<K> keySerializer) throws IOException {
+ return deserializeObject(keySerializer);
+ }
+
+ public <T> T deserializeObject(CoderTypeSerializer<T> objectSerializer) throws IOException {
+ return objectSerializer.deserialize(input);
+ }
+
+ ///////// Helper Methods ///////
+
+ private byte[] readRawData() throws IOException {
+ validate();
+ int size = input.readInt();
+
+ byte[] serData = new byte[size];
+ int bytesRead = input.read(serData);
+ if (bytesRead != size) {
+ throw new RuntimeException("Error while deserializing checkpoint. Not enough bytes in the input stream.");
+ }
+ return serData;
+ }
+
+ private void validate() {
+ if (this.input == null) {
+ throw new RuntimeException("StateBackend not initialized yet.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java
new file mode 100644
index 0000000..6bc8662
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming.state;
+
+import com.dataartisans.flink.dataflow.translation.types.CoderTypeSerializer;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.util.TimeDomain;
+import com.google.cloud.dataflow.sdk.util.TimerInternals;
+import com.google.cloud.dataflow.sdk.util.state.StateNamespace;
+import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class StateCheckpointUtils {
+
+ public static <K> void encodeState(Map<K, FlinkStateInternals<K>> perKeyStateInternals,
+ StateCheckpointWriter writer, Coder<K> keyCoder) throws IOException {
+ CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
+
+ int noOfKeys = perKeyStateInternals.size();
+ writer.writeInt(noOfKeys);
+ for (Map.Entry<K, FlinkStateInternals<K>> keyStatePair : perKeyStateInternals.entrySet()) {
+ K key = keyStatePair.getKey();
+ FlinkStateInternals<K> state = keyStatePair.getValue();
+
+ // encode the key
+ writer.serializeKey(key, keySerializer);
+
+ // write the associated state
+ state.persistState(writer);
+ }
+ }
+
+ public static <K> Map<K, FlinkStateInternals<K>> decodeState(
+ StateCheckpointReader reader,
+ Combine.KeyedCombineFn<K, ?, ?, ?> combineFn,
+ Coder<K> keyCoder,
+ Coder<? extends BoundedWindow> windowCoder,
+ ClassLoader classLoader) throws IOException, ClassNotFoundException {
+
+ int noOfKeys = reader.getInt();
+ Map<K, FlinkStateInternals<K>> perKeyStateInternals = new HashMap<>(noOfKeys);
+ perKeyStateInternals.clear();
+
+ CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
+ for (int i = 0; i < noOfKeys; i++) {
+
+ // decode the key.
+ K key = reader.deserializeKey(keySerializer);
+
+ //decode the state associated to the key.
+ FlinkStateInternals<K> stateForKey =
+ new FlinkStateInternals<>(key, keyCoder, windowCoder, combineFn);
+ stateForKey.restoreState(reader, classLoader);
+ perKeyStateInternals.put(key, stateForKey);
+ }
+ return perKeyStateInternals;
+ }
+
+ ////////////// Encoding/Decoding the Timers ////////////////
+
+
+ public static <K> void encodeTimers(Map<K, Set<TimerInternals.TimerData>> allTimers,
+ StateCheckpointWriter writer,
+ Coder<K> keyCoder) throws IOException {
+ CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
+
+ int noOfKeys = allTimers.size();
+ writer.writeInt(noOfKeys);
+ for (Map.Entry<K, Set<TimerInternals.TimerData>> timersPerKey : allTimers.entrySet()) {
+ K key = timersPerKey.getKey();
+
+ // encode the key
+ writer.serializeKey(key, keySerializer);
+
+ // write the associated timers
+ Set<TimerInternals.TimerData> timers = timersPerKey.getValue();
+ encodeTimerDataForKey(writer, timers);
+ }
+ }
+
+ public static <K> Map<K, Set<TimerInternals.TimerData>> decodeTimers(
+ StateCheckpointReader reader,
+ Coder<? extends BoundedWindow> windowCoder,
+ Coder<K> keyCoder) throws IOException {
+
+ int noOfKeys = reader.getInt();
+ Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(noOfKeys);
+ activeTimers.clear();
+
+ CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
+ for (int i = 0; i < noOfKeys; i++) {
+
+ // decode the key.
+ K key = reader.deserializeKey(keySerializer);
+
+ // decode the associated timers.
+ Set<TimerInternals.TimerData> timers = decodeTimerDataForKey(reader, windowCoder);
+ activeTimers.put(key, timers);
+ }
+ return activeTimers;
+ }
+
+ private static void encodeTimerDataForKey(StateCheckpointWriter writer, Set<TimerInternals.TimerData> timers) throws IOException {
+ // encode timers
+ writer.writeInt(timers.size());
+ for (TimerInternals.TimerData timer : timers) {
+ String stringKey = timer.getNamespace().stringKey();
+
+ writer.setTag(stringKey);
+ writer.setTimestamp(timer.getTimestamp());
+ writer.writeInt(timer.getDomain().ordinal());
+ }
+ }
+
+ private static Set<TimerInternals.TimerData> decodeTimerDataForKey(
+ StateCheckpointReader reader, Coder<? extends BoundedWindow> windowCoder) throws IOException {
+
+ // decode the timers: first their number and then the content itself.
+ int noOfTimers = reader.getInt();
+ Set<TimerInternals.TimerData> timers = new HashSet<>(noOfTimers);
+ for (int i = 0; i < noOfTimers; i++) {
+ String stringKey = reader.getTagToString();
+ Instant instant = reader.getTimestamp();
+ TimeDomain domain = TimeDomain.values()[reader.getInt()];
+
+ StateNamespace namespace = StateNamespaces.fromString(stringKey, windowCoder);
+ timers.add(TimerInternals.TimerData.of(namespace, instant, domain));
+ }
+ return timers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
new file mode 100644
index 0000000..7201112
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming.state;
+
+import com.dataartisans.flink.dataflow.translation.types.CoderTypeSerializer;
+import com.google.protobuf.ByteString;
+import org.apache.flink.runtime.state.StateBackend;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public class StateCheckpointWriter {
+
+ private final StateBackend.CheckpointStateOutputView output;
+
+ public static StateCheckpointWriter create(StateBackend.CheckpointStateOutputView output) {
+ return new StateCheckpointWriter(output);
+ }
+
+ private StateCheckpointWriter(StateBackend.CheckpointStateOutputView output) {
+ this.output = output;
+ }
+
+ ///////// Creating the serialized versions of the different types of state held by dataflow ///////
+
+ public StateCheckpointWriter addValueBuilder() throws IOException {
+ validate();
+ StateType.serialize(StateType.VALUE, this);
+ return this;
+ }
+
+ public StateCheckpointWriter addWatermarkHoldsBuilder() throws IOException {
+ validate();
+ StateType.serialize(StateType.WATERMARK, this);
+ return this;
+ }
+
+ public StateCheckpointWriter addListUpdatesBuilder() throws IOException {
+ validate();
+ StateType.serialize(StateType.LIST, this);
+ return this;
+ }
+
+ public StateCheckpointWriter addAccumulatorBuilder() throws IOException {
+ validate();
+ StateType.serialize(StateType.ACCUMULATOR, this);
+ return this;
+ }
+
+ ///////// Setting the tag for a given state element ///////
+
+ public StateCheckpointWriter setTag(ByteString stateKey) throws IOException {
+ return writeData(stateKey.toByteArray());
+ }
+
+ public StateCheckpointWriter setTag(String stateKey) throws IOException {
+ output.writeUTF(stateKey);
+ return this;
+ }
+
+
+ public <K> StateCheckpointWriter serializeKey(K key, CoderTypeSerializer<K> keySerializer) throws IOException {
+ return serializeObject(key, keySerializer);
+ }
+
+ public <T> StateCheckpointWriter serializeObject(T object, CoderTypeSerializer<T> objectSerializer) throws IOException {
+ objectSerializer.serialize(object, output);
+ return this;
+ }
+
+ ///////// Write the actual serialized data //////////
+
+ public StateCheckpointWriter setData(ByteString data) throws IOException {
+ return writeData(data.toByteArray());
+ }
+
+ public StateCheckpointWriter setData(byte[] data) throws IOException {
+ return writeData(data);
+ }
+
+ public StateCheckpointWriter setTimestamp(Instant timestamp) throws IOException {
+ validate();
+ output.writeLong(TimeUnit.MILLISECONDS.toMicros(timestamp.getMillis()));
+ return this;
+ }
+
+ public StateCheckpointWriter writeInt(int number) throws IOException {
+ validate();
+ output.writeInt(number);
+ return this;
+ }
+
+ public StateCheckpointWriter writeByte(byte b) throws IOException {
+ validate();
+ output.writeByte(b);
+ return this;
+ }
+
+ ///////// Helper Methods ///////
+
+ private StateCheckpointWriter writeData(byte[] data) throws IOException {
+ validate();
+ output.writeInt(data.length);
+ output.write(data);
+ return this;
+ }
+
+ private void validate() {
+ if (this.output == null) {
+ throw new RuntimeException("StateBackend not initialized yet.");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
new file mode 100644
index 0000000..11446ea
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming.state;
+
+import java.io.IOException;
+
+public enum StateType {
+
+ VALUE(0),
+
+ WATERMARK(1),
+
+ LIST(2),
+
+ ACCUMULATOR(3);
+
+ private final int numVal;
+
+ StateType(int value) {
+ this.numVal = value;
+ }
+
+ public static void serialize(StateType type, StateCheckpointWriter output) throws IOException {
+ if (output == null) {
+ throw new IllegalArgumentException("Cannot write to a null output.");
+ }
+
+ if(type.numVal < 0 || type.numVal > 3) {
+ throw new RuntimeException("Unknown State Type " + type + ".");
+ }
+
+ output.writeByte((byte) type.numVal);
+ }
+
+ public static StateType deserialize(StateCheckpointReader input) throws IOException {
+ if (input == null) {
+ throw new IllegalArgumentException("Cannot read from a null input.");
+ }
+
+ int typeInt = (int) input.getByte();
+ if(typeInt < 0 || typeInt > 3) {
+ throw new RuntimeException("Unknown State Type " + typeInt + ".");
+ }
+
+ StateType resultType = null;
+ for(StateType st: values()) {
+ if(st.numVal == typeInt) {
+ resultType = st;
+ break;
+ }
+ }
+ return resultType;
+ }
+}