You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/04/19 13:09:14 UTC
[05/18] beam git commit: [BEAM-1994] Remove Flink examples package
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java
new file mode 100644
index 0000000..e24bf31
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.types;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Flink {@link TypeInformation} for Beam values that have been encoded to byte data
+ * by a {@link Coder}.
+ */
+public class EncodedValueTypeInformation
+ extends TypeInformation<byte[]>
+ implements AtomicType<byte[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ public int getArity() {
+ return 0;
+ }
+
+ @Override
+ public int getTotalFields() {
+ return 0;
+ }
+
+ @Override
+ public Class<byte[]> getTypeClass() {
+ return byte[].class;
+ }
+
+ @Override
+ public boolean isKeyType() {
+ return true;
+ }
+
+ @Override
+ public TypeSerializer<byte[]> createSerializer(ExecutionConfig executionConfig) {
+ return new EncodedValueSerializer();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return other instanceof EncodedValueTypeInformation;
+ }
+
+ @Override
+ public int hashCode() {
+ return this.getClass().hashCode();
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof EncodedValueTypeInformation;
+ }
+
+ @Override
+ public String toString() {
+ return "EncodedValueTypeInformation";
+ }
+
+ @Override
+ public TypeComparator<byte[]> createComparator(
+ boolean sortOrderAscending,
+ ExecutionConfig executionConfig) {
+ return new EncodedValueComparator(sortOrderAscending);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java
new file mode 100644
index 0000000..36b5ba3
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.types;
+
+import java.io.ByteArrayOutputStream;
+
+/**
+ * Version of {@link java.io.ByteArrayOutputStream} that allows to retrieve the internal
+ * byte[] buffer without incurring an array copy.
+ */
+public class InspectableByteArrayOutputStream extends ByteArrayOutputStream {
+
+ /**
+ * Get the underlying byte array.
+ */
+ public byte[] getBuffer() {
+ return buf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java
new file mode 100644
index 0000000..9df6836
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.types;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+/**
+ * {@link KeySelector} that extracts the key from a {@link KV} and returns
+ * it in encoded form as a {@code byte} array.
+ */
+public class KvKeySelector<InputT, K>
+ implements KeySelector<WindowedValue<KV<K, InputT>>, byte[]>, ResultTypeQueryable<byte[]> {
+
+ private final Coder<K> keyCoder;
+
+ public KvKeySelector(Coder<K> keyCoder) {
+ this.keyCoder = keyCoder;
+ }
+
+ @Override
+ public byte[] getKey(WindowedValue<KV<K, InputT>> value) throws Exception {
+ return CoderUtils.encodeToByteArray(keyCoder, value.getValue().getKey());
+ }
+
+ @Override
+ public TypeInformation<byte[]> getProducedType() {
+ return new EncodedValueTypeInformation();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java
new file mode 100644
index 0000000..6fb3182
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation.types;
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
new file mode 100644
index 0000000..2256bb1
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink.translation.utils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.IOChannelUtils;
+
+/**
+ * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
+ */
+public class SerializedPipelineOptions implements Serializable {
+
+ private final byte[] serializedOptions;
+
+ /** Lazily initialized copy of deserialized options. */
+ private transient PipelineOptions pipelineOptions;
+
+ public SerializedPipelineOptions(PipelineOptions options) {
+ checkNotNull(options, "PipelineOptions must not be null.");
+
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ new ObjectMapper().writeValue(baos, options);
+ this.serializedOptions = baos.toByteArray();
+ } catch (Exception e) {
+ throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
+ }
+
+ }
+
+ public PipelineOptions getPipelineOptions() {
+ if (pipelineOptions == null) {
+ try {
+ pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
+
+ IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
+ FileSystems.setDefaultConfigInWorkers(pipelineOptions);
+ } catch (IOException e) {
+ throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
+ }
+ }
+
+ return pipelineOptions;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java
new file mode 100644
index 0000000..5dedd53
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation.utils;
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java
new file mode 100644
index 0000000..82a2c4e
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.flink.core.memory.DataInputView;
+
+/**
+ * Wrapper for {@link DataInputView}. We need this because Flink reads data using a
+ * {@link org.apache.flink.core.memory.DataInputView} while
+ * Dataflow {@link org.apache.beam.sdk.coders.Coder}s expect an
+ * {@link java.io.InputStream}.
+ */
+public class DataInputViewWrapper extends InputStream {
+
+ private DataInputView inputView;
+
+ public DataInputViewWrapper(DataInputView inputView) {
+ this.inputView = inputView;
+ }
+
+ public void setInputView(DataInputView inputView) {
+ this.inputView = inputView;
+ }
+
+ @Override
+ public int read() throws IOException {
+ try {
+ return inputView.readUnsignedByte();
+ } catch (EOFException e) {
+ // translate between DataInput and InputStream,
+ // DataInput signals EOF by exception, InputStream does it by returning -1
+ return -1;
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return inputView.read(b, off, len);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
new file mode 100644
index 0000000..f2d9db2
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import org.apache.flink.core.memory.DataOutputView;
+
+/**
+ * Wrapper for {@link org.apache.flink.core.memory.DataOutputView}. We need this because
+ * Flink writes data using a {@link org.apache.flink.core.memory.DataInputView} while
+ * Dataflow {@link org.apache.beam.sdk.coders.Coder}s expect an
+ * {@link java.io.OutputStream}.
+ */
+public class DataOutputViewWrapper extends OutputStream {
+
+ private DataOutputView outputView;
+
+ public DataOutputViewWrapper(DataOutputView outputView) {
+ this.outputView = outputView;
+ }
+
+ public void setOutputView(DataOutputView outputView) {
+ this.outputView = outputView;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ outputView.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ outputView.write(b, off, len);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
new file mode 100644
index 0000000..70d97e3
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.io.Serializable;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.flink.api.common.accumulators.Accumulator;
+
+/**
+ * Wrapper that wraps a {@link org.apache.beam.sdk.transforms.Combine.CombineFn}
+ * in a Flink {@link org.apache.flink.api.common.accumulators.Accumulator} for using
+ * the function as an aggregator in a {@link org.apache.beam.sdk.transforms.ParDo}
+ * operation.
+ */
+public class SerializableFnAggregatorWrapper<InputT, OutputT>
+ implements Aggregator<InputT, OutputT>, Accumulator<InputT, Serializable> {
+
+ private OutputT aa;
+ private Combine.CombineFn<InputT, ?, OutputT> combiner;
+
+ public SerializableFnAggregatorWrapper(Combine.CombineFn<InputT, ?, OutputT> combiner) {
+ this.combiner = combiner;
+ resetLocal();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void add(InputT value) {
+ this.aa = combiner.apply(ImmutableList.of((InputT) aa, value));
+ }
+
+ @Override
+ public Serializable getLocalValue() {
+ return (Serializable) aa;
+ }
+
+ @Override
+ public void resetLocal() {
+ this.aa = combiner.apply(ImmutableList.<InputT>of());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void merge(Accumulator<InputT, Serializable> other) {
+ this.aa = combiner.apply(ImmutableList.of((InputT) aa, (InputT) other.getLocalValue()));
+ }
+
+ @Override
+ public void addValue(InputT value) {
+ add(value);
+ }
+
+ @Override
+ public String getName() {
+ return "Aggregator :" + combiner.toString();
+ }
+
+ @Override
+ public Combine.CombineFn<InputT, ?, OutputT> getCombineFn() {
+ return combiner;
+ }
+
+ @Override
+ public Accumulator<InputT, Serializable> clone() {
+ try {
+ super.clone();
+ } catch (CloneNotSupportedException e) {
+ // Flink Accumulators cannot throw CloneNotSupportedException, work around that.
+ throw new RuntimeException(e);
+ }
+
+ // copy it by merging
+ OutputT resultCopy = combiner.apply(Lists.newArrayList((InputT) aa));
+ SerializableFnAggregatorWrapper<InputT, OutputT> result = new
+ SerializableFnAggregatorWrapper<>(combiner);
+
+ result.aa = resultCopy;
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
new file mode 100644
index 0000000..a87472b
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Wrapper for executing a {@link Source} as a Flink {@link InputFormat}.
+ */
+public class SourceInputFormat<T>
+ implements InputFormat<WindowedValue<T>, SourceInputSplit<T>> {
+ private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class);
+
+ private final BoundedSource<T> initialSource;
+
+ private transient PipelineOptions options;
+ private final SerializedPipelineOptions serializedOptions;
+
+ private transient BoundedSource.BoundedReader<T> reader;
+ private boolean inputAvailable = false;
+
+ public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) {
+ this.initialSource = initialSource;
+ this.serializedOptions = new SerializedPipelineOptions(options);
+ }
+
+ @Override
+ public void configure(Configuration configuration) {
+ options = serializedOptions.getPipelineOptions();
+ }
+
+ @Override
+ public void open(SourceInputSplit<T> sourceInputSplit) throws IOException {
+ reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options);
+ inputAvailable = reader.start();
+ }
+
+ @Override
+ public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException {
+ try {
+ final long estimatedSize = initialSource.getEstimatedSizeBytes(options);
+
+ return new BaseStatistics() {
+ @Override
+ public long getTotalInputSize() {
+ return estimatedSize;
+ }
+
+ @Override
+ public long getNumberOfRecords() {
+ return BaseStatistics.NUM_RECORDS_UNKNOWN;
+ }
+
+ @Override
+ public float getAverageRecordWidth() {
+ return BaseStatistics.AVG_RECORD_BYTES_UNKNOWN;
+ }
+ };
+ } catch (Exception e) {
+ LOG.warn("Could not read Source statistics: {}", e);
+ }
+
+ return null;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException {
+ try {
+ long desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits;
+ List<? extends Source<T>> shards =
+ initialSource.split(desiredSizeBytes, options);
+ int numShards = shards.size();
+ SourceInputSplit<T>[] sourceInputSplits = new SourceInputSplit[numShards];
+ for (int i = 0; i < numShards; i++) {
+ sourceInputSplits[i] = new SourceInputSplit<>(shards.get(i), i);
+ }
+ return sourceInputSplits;
+ } catch (Exception e) {
+ throw new IOException("Could not create input splits from Source.", e);
+ }
+ }
+
+ @Override
+ public InputSplitAssigner getInputSplitAssigner(final SourceInputSplit[] sourceInputSplits) {
+ return new DefaultInputSplitAssigner(sourceInputSplits);
+ }
+
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return !inputAvailable;
+ }
+
+ @Override
+ public WindowedValue<T> nextRecord(WindowedValue<T> t) throws IOException {
+ if (inputAvailable) {
+ final T current = reader.getCurrent();
+ final Instant timestamp = reader.getCurrentTimestamp();
+ // advance reader to have a record ready next time
+ inputAvailable = reader.advance();
+ return WindowedValue.of(
+ current,
+ timestamp,
+ GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+ }
+
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // TODO null check can be removed once FLINK-3796 is fixed
+ if (reader != null) {
+ reader.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java
new file mode 100644
index 0000000..e4a7386
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;
+
+import org.apache.beam.sdk.io.Source;
+import org.apache.flink.core.io.InputSplit;
+
+/**
+ * {@link org.apache.flink.core.io.InputSplit} for
+ * {@link org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat}. We pass
+ * the sharded Source around in the input split because Sources simply split up into several
+ * Sources for sharding. This is different to how Flink creates a separate InputSplit from
+ * an InputFormat.
+ */
+public class SourceInputSplit<T> implements InputSplit {
+
+ private Source<T> source;
+ private int splitNumber;
+
+ public SourceInputSplit() {
+ }
+
+ public SourceInputSplit(Source<T> source, int splitNumber) {
+ this.source = source;
+ this.splitNumber = splitNumber;
+ }
+
+ @Override
+ public int getSplitNumber() {
+ return splitNumber;
+ }
+
+ public Source<T> getSource() {
+ return source;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java
new file mode 100644
index 0000000..72f7deb
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
new file mode 100644
index 0000000..8a09286
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -0,0 +1,774 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.AggregatorFactory;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SideInputHandler;
+import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.StatefulDoFnRunner;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
+import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkSplitStateInternals;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.NullSideInputReader;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.HeapInternalTimerService;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.joda.time.Instant;
+
+/**
+ * Flink operator for executing {@link DoFn DoFns}.
+ *
+ * @param <InputT> the input type of the {@link DoFn}
+ * @param <FnOutputT> the output type of the {@link DoFn}
+ * @param <OutputT> the output type of the operator, this can be different from the fn output
+ * type when we have additional tagged outputs
+ */
+public class DoFnOperator<InputT, FnOutputT, OutputT>
+ extends AbstractStreamOperator<OutputT>
+ implements OneInputStreamOperator<WindowedValue<InputT>, OutputT>,
+ TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, OutputT>,
+ KeyGroupCheckpointedOperator, Triggerable<Object, TimerData> {
+
+ protected DoFn<InputT, FnOutputT> doFn;
+
+ protected final SerializedPipelineOptions serializedOptions;
+
+ protected final TupleTag<FnOutputT> mainOutputTag;
+ protected final List<TupleTag<?>> additionalOutputTags;
+
+ protected final Collection<PCollectionView<?>> sideInputs;
+ protected final Map<Integer, PCollectionView<?>> sideInputTagMapping;
+
+ protected final WindowingStrategy<?, ?> windowingStrategy;
+
+ protected final OutputManagerFactory<OutputT> outputManagerFactory;
+
+ protected transient DoFnRunner<InputT, FnOutputT> doFnRunner;
+ protected transient PushbackSideInputDoFnRunner<InputT, FnOutputT> pushbackDoFnRunner;
+
+ protected transient SideInputHandler sideInputHandler;
+
+ protected transient SideInputReader sideInputReader;
+
+ protected transient DoFnRunners.OutputManager outputManager;
+
+ private transient DoFnInvoker<InputT, FnOutputT> doFnInvoker;
+
+ protected transient long currentInputWatermark;
+
+ protected transient long currentOutputWatermark;
+
+ private transient StateTag<Object, BagState<WindowedValue<InputT>>> pushedBackTag;
+
+ protected transient FlinkStateInternals<?> stateInternals;
+
+ private Coder<WindowedValue<InputT>> inputCoder;
+
+ private final Coder<?> keyCoder;
+
+ private final TimerInternals.TimerDataCoder timerCoder;
+
+ protected transient HeapInternalTimerService<?, TimerInternals.TimerData> timerService;
+
+ protected transient FlinkTimerInternals timerInternals;
+
+ private transient StateInternals<?> pushbackStateInternals;
+
+ private transient Optional<Long> pushedBackWatermark;
+
+ public DoFnOperator(
+ DoFn<InputT, FnOutputT> doFn,
+ Coder<WindowedValue<InputT>> inputCoder,
+ TupleTag<FnOutputT> mainOutputTag,
+ List<TupleTag<?>> additionalOutputTags,
+ OutputManagerFactory<OutputT> outputManagerFactory,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Map<Integer, PCollectionView<?>> sideInputTagMapping,
+ Collection<PCollectionView<?>> sideInputs,
+ PipelineOptions options,
+ Coder<?> keyCoder) {
+ this.doFn = doFn;
+ this.inputCoder = inputCoder;
+ this.mainOutputTag = mainOutputTag;
+ this.additionalOutputTags = additionalOutputTags;
+ this.sideInputTagMapping = sideInputTagMapping;
+ this.sideInputs = sideInputs;
+ this.serializedOptions = new SerializedPipelineOptions(options);
+ this.windowingStrategy = windowingStrategy;
+ this.outputManagerFactory = outputManagerFactory;
+
+ setChainingStrategy(ChainingStrategy.ALWAYS);
+
+ this.keyCoder = keyCoder;
+
+ this.timerCoder =
+ TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
+ }
+
+ private ExecutionContext.StepContext createStepContext() {
+ return new StepContext();
+ }
+
+ // allow overriding this in WindowDoFnOperator because this one dynamically creates
+ // the DoFn
+ protected DoFn<InputT, FnOutputT> getDoFn() {
+ return doFn;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ currentInputWatermark = Long.MIN_VALUE;
+ currentOutputWatermark = Long.MIN_VALUE;
+
+ AggregatorFactory aggregatorFactory = new AggregatorFactory() {
+ @Override
+ public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
+ Class<?> fnClass,
+ ExecutionContext.StepContext stepContext,
+ String aggregatorName,
+ Combine.CombineFn<InputT, AccumT, OutputT> combine) {
+
+ @SuppressWarnings("unchecked")
+ SerializableFnAggregatorWrapper<InputT, OutputT> result =
+ (SerializableFnAggregatorWrapper<InputT, OutputT>)
+ getRuntimeContext().getAccumulator(aggregatorName);
+
+ if (result == null) {
+ result = new SerializableFnAggregatorWrapper<>(combine);
+ getRuntimeContext().addAccumulator(aggregatorName, result);
+ }
+ return result;
+ }
+ };
+
+ sideInputReader = NullSideInputReader.of(sideInputs);
+
+ if (!sideInputs.isEmpty()) {
+
+ pushedBackTag = StateTags.bag("pushed-back-values", inputCoder);
+
+ FlinkBroadcastStateInternals sideInputStateInternals =
+ new FlinkBroadcastStateInternals<>(
+ getContainingTask().getIndexInSubtaskGroup(), getOperatorStateBackend());
+
+ sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals);
+ sideInputReader = sideInputHandler;
+
+ // maybe init by initializeState
+ if (pushbackStateInternals == null) {
+ if (keyCoder != null) {
+ pushbackStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder,
+ getKeyedStateBackend());
+ } else {
+ pushbackStateInternals =
+ new FlinkSplitStateInternals<Object>(getOperatorStateBackend());
+ }
+ }
+
+ pushedBackWatermark = Optional.absent();
+
+ }
+
+ outputManager = outputManagerFactory.create(output);
+
+ // StatefulPardo or WindowDoFn
+ if (keyCoder != null) {
+ stateInternals = new FlinkStateInternals<>((KeyedStateBackend) getKeyedStateBackend(),
+ keyCoder);
+
+ timerService = (HeapInternalTimerService<?, TimerInternals.TimerData>)
+ getInternalTimerService("beam-timer", new CoderTypeSerializer<>(timerCoder), this);
+
+ timerInternals = new FlinkTimerInternals();
+
+ }
+
+ // WindowDoFnOperator need use state and timer to get DoFn.
+ // So must wait StateInternals and TimerInternals ready.
+ this.doFn = getDoFn();
+ doFnInvoker = DoFnInvokers.invokerFor(doFn);
+
+ doFnInvoker.invokeSetup();
+
+ ExecutionContext.StepContext stepContext = createStepContext();
+
+ doFnRunner = DoFnRunners.simpleRunner(
+ serializedOptions.getPipelineOptions(),
+ doFn,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ additionalOutputTags,
+ stepContext,
+ aggregatorFactory,
+ windowingStrategy);
+
+ if (doFn instanceof GroupAlsoByWindowViaWindowSetNewDoFn) {
+ // When the doFn is this, we know it came from WindowDoFnOperator and
+ // InputT = KeyedWorkItem<K, V>
+ // OutputT = KV<K, V>
+ //
+ // for some K, V
+
+
+ doFnRunner = DoFnRunners.lateDataDroppingRunner(
+ (DoFnRunner) doFnRunner,
+ stepContext,
+ windowingStrategy,
+ ((GroupAlsoByWindowViaWindowSetNewDoFn) doFn).getDroppedDueToLatenessAggregator());
+ } else if (keyCoder != null) {
+ // It is a stateful DoFn
+
+ StatefulDoFnRunner.CleanupTimer cleanupTimer =
+ new StatefulDoFnRunner.TimeInternalsCleanupTimer(
+ stepContext.timerInternals(), windowingStrategy);
+
+ // we don't know the window type
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ StatefulDoFnRunner.StateCleaner<?> stateCleaner =
+ new StatefulDoFnRunner.StateInternalsStateCleaner<>(
+ doFn, stepContext.stateInternals(), windowCoder);
+
+ doFnRunner = DoFnRunners.defaultStatefulDoFnRunner(
+ doFn,
+ doFnRunner,
+ stepContext,
+ aggregatorFactory,
+ windowingStrategy,
+ cleanupTimer,
+ stateCleaner);
+ }
+
+ pushbackDoFnRunner =
+ SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ doFnInvoker.invokeTeardown();
+ }
+
+ protected final long getPushbackWatermarkHold() {
+ // if we don't have side inputs we never hold the watermark
+ if (sideInputs.isEmpty()) {
+ return BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+ }
+
+ try {
+ checkInitPushedBackWatermark();
+ return pushedBackWatermark.get();
+ } catch (Exception e) {
+ throw new RuntimeException("Error retrieving pushed back watermark state.", e);
+ }
+ }
+
+ private void checkInitPushedBackWatermark() {
+ // init and restore from pushedBack state.
+ // Not done in initializeState, because OperatorState is not ready.
+ if (!pushedBackWatermark.isPresent()) {
+
+ BagState<WindowedValue<InputT>> pushedBack =
+ pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+
+ long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+ for (WindowedValue<InputT> value : pushedBack.read()) {
+ min = Math.min(min, value.getTimestamp().getMillis());
+ }
+ setPushedBackWatermark(min);
+ }
+ }
+
+ @Override
+ public final void processElement(
+ StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
+ doFnRunner.startBundle();
+ doFnRunner.processElement(streamRecord.getValue());
+ doFnRunner.finishBundle();
+ }
+
+ private void setPushedBackWatermark(long watermark) {
+ pushedBackWatermark = Optional.fromNullable(watermark);
+ }
+
+ @Override
+ public final void processElement1(
+ StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
+ pushbackDoFnRunner.startBundle();
+ Iterable<WindowedValue<InputT>> justPushedBack =
+ pushbackDoFnRunner.processElementInReadyWindows(streamRecord.getValue());
+
+ BagState<WindowedValue<InputT>> pushedBack =
+ pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+
+ checkInitPushedBackWatermark();
+
+ long min = pushedBackWatermark.get();
+ for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
+ min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
+ pushedBack.add(pushedBackValue);
+ }
+ setPushedBackWatermark(min);
+ pushbackDoFnRunner.finishBundle();
+ }
+
+ @Override
+ public final void processElement2(
+ StreamRecord<RawUnionValue> streamRecord) throws Exception {
+ pushbackDoFnRunner.startBundle();
+
+ @SuppressWarnings("unchecked")
+ WindowedValue<Iterable<?>> value =
+ (WindowedValue<Iterable<?>>) streamRecord.getValue().getValue();
+
+ PCollectionView<?> sideInput = sideInputTagMapping.get(streamRecord.getValue().getUnionTag());
+ sideInputHandler.addSideInputValue(sideInput, value);
+
+ BagState<WindowedValue<InputT>> pushedBack =
+ pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+
+ List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
+
+ Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
+ if (pushedBackContents != null) {
+ for (WindowedValue<InputT> elem : pushedBackContents) {
+
+ // we need to set the correct key in case the operator is
+ // a (keyed) window operator
+ setKeyContextElement1(new StreamRecord<>(elem));
+
+ Iterable<WindowedValue<InputT>> justPushedBack =
+ pushbackDoFnRunner.processElementInReadyWindows(elem);
+ Iterables.addAll(newPushedBack, justPushedBack);
+ }
+ }
+
+ pushedBack.clear();
+ long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+ for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
+ min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
+ pushedBack.add(pushedBackValue);
+ }
+ setPushedBackWatermark(min);
+
+ pushbackDoFnRunner.finishBundle();
+
+ // maybe output a new watermark
+ processWatermark1(new Watermark(currentInputWatermark));
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ processWatermark1(mark);
+ }
+
+ @Override
+ public void processWatermark1(Watermark mark) throws Exception {
+ if (keyCoder == null) {
+ this.currentInputWatermark = mark.getTimestamp();
+ long potentialOutputWatermark =
+ Math.min(getPushbackWatermarkHold(), currentInputWatermark);
+ if (potentialOutputWatermark > currentOutputWatermark) {
+ currentOutputWatermark = potentialOutputWatermark;
+ output.emitWatermark(new Watermark(currentOutputWatermark));
+ }
+ } else {
+ // fireTimers, so we need startBundle.
+ pushbackDoFnRunner.startBundle();
+
+ this.currentInputWatermark = mark.getTimestamp();
+
+ // hold back by the pushed back values waiting for side inputs
+ long actualInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp());
+
+ timerService.advanceWatermark(actualInputWatermark);
+
+ Instant watermarkHold = stateInternals.watermarkHold();
+
+ long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold());
+
+ long potentialOutputWatermark = Math.min(currentInputWatermark, combinedWatermarkHold);
+
+ if (potentialOutputWatermark > currentOutputWatermark) {
+ currentOutputWatermark = potentialOutputWatermark;
+ output.emitWatermark(new Watermark(currentOutputWatermark));
+ }
+ pushbackDoFnRunner.finishBundle();
+ }
+ }
+
+ @Override
+ public void processWatermark2(Watermark mark) throws Exception {
+ // ignore watermarks from the side-input input
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ // copy from AbstractStreamOperator
+ if (getKeyedStateBackend() != null) {
+ KeyedStateCheckpointOutputStream out;
+
+ try {
+ out = context.getRawKeyedOperatorStateOutput();
+ } catch (Exception exception) {
+ throw new Exception("Could not open raw keyed operator state stream for "
+ + getOperatorName() + '.', exception);
+ }
+
+ try {
+ KeyGroupsList allKeyGroups = out.getKeyGroupList();
+ for (int keyGroupIdx : allKeyGroups) {
+ out.startNewKeyGroup(keyGroupIdx);
+
+ DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out);
+
+ // if (this instanceof KeyGroupCheckpointedOperator)
+ snapshotKeyGroupState(keyGroupIdx, dov);
+
+ // We can't get all timerServices, so we just snapshot our timerService
+ // Maybe this is a normal DoFn that has no timerService
+ if (keyCoder != null) {
+ timerService.snapshotTimersForKeyGroup(dov, keyGroupIdx);
+ }
+
+ }
+ } catch (Exception exception) {
+ throw new Exception("Could not write timer service of " + getOperatorName()
+ + " to checkpoint state stream.", exception);
+ } finally {
+ try {
+ out.close();
+ } catch (Exception closeException) {
+ LOG.warn("Could not close raw keyed operator state stream for {}. This "
+ + "might have prevented deleting some state data.", getOperatorName(),
+ closeException);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) throws Exception {
+ if (!sideInputs.isEmpty() && keyCoder != null) {
+ ((FlinkKeyGroupStateInternals) pushbackStateInternals).snapshotKeyGroupState(
+ keyGroupIndex, out);
+ }
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws Exception {
+ if (getKeyedStateBackend() != null) {
+ int totalKeyGroups = getKeyedStateBackend().getNumberOfKeyGroups();
+ KeyGroupsList localKeyGroupRange = getKeyedStateBackend().getKeyGroupRange();
+
+ for (KeyGroupStatePartitionStreamProvider streamProvider : context.getRawKeyedStateInputs()) {
+ DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(streamProvider.getStream());
+
+ int keyGroupIdx = streamProvider.getKeyGroupId();
+ checkArgument(localKeyGroupRange.contains(keyGroupIdx),
+ "Key Group " + keyGroupIdx + " does not belong to the local range.");
+
+ // if (this instanceof KeyGroupRestoringOperator)
+ restoreKeyGroupState(keyGroupIdx, div);
+
+ // We just initialize our timerService
+ if (keyCoder != null) {
+ if (timerService == null) {
+ timerService = new HeapInternalTimerService<>(
+ totalKeyGroups,
+ localKeyGroupRange,
+ this,
+ getRuntimeContext().getProcessingTimeService());
+ }
+ timerService.restoreTimersForKeyGroup(div, keyGroupIdx, getUserCodeClassloader());
+ }
+ }
+ }
+ }
+
+ @Override
+ public void restoreKeyGroupState(int keyGroupIndex, DataInputStream in) throws Exception {
+ if (!sideInputs.isEmpty() && keyCoder != null) {
+ if (pushbackStateInternals == null) {
+ pushbackStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder,
+ getKeyedStateBackend());
+ }
+ ((FlinkKeyGroupStateInternals) pushbackStateInternals)
+ .restoreKeyGroupState(keyGroupIndex, in, getUserCodeClassloader());
+ }
+ }
+
+ @Override
+ public void onEventTime(InternalTimer<Object, TimerData> timer) throws Exception {
+ fireTimer(timer);
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer<Object, TimerData> timer) throws Exception {
+ fireTimer(timer);
+ }
+
+ // allow overriding this in WindowDoFnOperator
+ public void fireTimer(InternalTimer<?, TimerData> timer) {
+ TimerInternals.TimerData timerData = timer.getNamespace();
+ StateNamespace namespace = timerData.getNamespace();
+ // This is a user timer, so namespace must be WindowNamespace
+ checkArgument(namespace instanceof WindowNamespace);
+ BoundedWindow window = ((WindowNamespace) namespace).getWindow();
+ pushbackDoFnRunner.onTimer(timerData.getTimerId(), window,
+ timerData.getTimestamp(), timerData.getDomain());
+ }
+
+ /**
+ * Factory for creating an {@link DoFnRunners.OutputManager} from
+ * a Flink {@link Output}.
+ */
+ interface OutputManagerFactory<OutputT> extends Serializable {
+ DoFnRunners.OutputManager create(Output<StreamRecord<OutputT>> output);
+ }
+
+ /**
+ * Default implementation of {@link OutputManagerFactory} that creates an
+ * {@link DoFnRunners.OutputManager} that only writes to
+ * a single logical output.
+ */
+ public static class DefaultOutputManagerFactory<OutputT>
+ implements OutputManagerFactory<OutputT> {
+ @Override
+ public DoFnRunners.OutputManager create(final Output<StreamRecord<OutputT>> output) {
+ return new DoFnRunners.OutputManager() {
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
+ // with tagged outputs we can't get around this because we don't
+ // know our own output type...
+ @SuppressWarnings("unchecked")
+ OutputT castValue = (OutputT) value;
+ output.collect(new StreamRecord<>(castValue));
+ }
+ };
+ }
+ }
+
+ /**
+ * Implementation of {@link OutputManagerFactory} that creates an
+ * {@link DoFnRunners.OutputManager} that can write to multiple logical
+ * outputs by unioning them in a {@link RawUnionValue}.
+ */
+ public static class MultiOutputOutputManagerFactory
+ implements OutputManagerFactory<RawUnionValue> {
+
+ Map<TupleTag<?>, Integer> mapping;
+
+ public MultiOutputOutputManagerFactory(Map<TupleTag<?>, Integer> mapping) {
+ this.mapping = mapping;
+ }
+
+ @Override
+ public DoFnRunners.OutputManager create(final Output<StreamRecord<RawUnionValue>> output) {
+ return new DoFnRunners.OutputManager() {
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
+ int intTag = mapping.get(tag);
+ output.collect(new StreamRecord<>(new RawUnionValue(intTag, value)));
+ }
+ };
+ }
+ }
+
+ /**
+ * {@link StepContext} for running {@link DoFn DoFns} on Flink. This does not allow
+ * accessing state or timer internals.
+ */
+ protected class StepContext implements ExecutionContext.StepContext {
+
+ @Override
+ public String getStepName() {
+ return null;
+ }
+
+ @Override
+ public String getTransformName() {
+ return null;
+ }
+
+ @Override
+ public void noteOutput(WindowedValue<?> output) {}
+
+ @Override
+ public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {}
+
+ @Override
+ public <T, W extends BoundedWindow> void writePCollectionViewData(
+ TupleTag<?> tag,
+ Iterable<WindowedValue<T>> data,
+ Coder<Iterable<WindowedValue<T>>> dataCoder,
+ W window,
+ Coder<W> windowCoder) throws IOException {
+ throw new UnsupportedOperationException("Writing side-input data is not supported.");
+ }
+
+ @Override
+ public StateInternals<?> stateInternals() {
+ return stateInternals;
+ }
+
+ @Override
+ public TimerInternals timerInternals() {
+ return timerInternals;
+ }
+ }
+
+ private class FlinkTimerInternals implements TimerInternals {
+
+ @Override
+ public void setTimer(
+ StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
+ setTimer(TimerData.of(timerId, namespace, target, timeDomain));
+ }
+
+ @Deprecated
+ @Override
+ public void setTimer(TimerData timerKey) {
+ long time = timerKey.getTimestamp().getMillis();
+ if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
+ timerService.registerEventTimeTimer(timerKey, time);
+ } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
+ timerService.registerProcessingTimeTimer(timerKey, time);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported time domain: " + timerKey.getDomain());
+ }
+ }
+
+ @Deprecated
+ @Override
+ public void deleteTimer(StateNamespace namespace, String timerId) {
+ throw new UnsupportedOperationException(
+ "Canceling of a timer by ID is not yet supported.");
+ }
+
+ @Override
+ public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
+ throw new UnsupportedOperationException(
+ "Canceling of a timer by ID is not yet supported.");
+ }
+
+ @Deprecated
+ @Override
+ public void deleteTimer(TimerData timerKey) {
+ long time = timerKey.getTimestamp().getMillis();
+ if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
+ timerService.deleteEventTimeTimer(timerKey, time);
+ } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
+ timerService.deleteProcessingTimeTimer(timerKey, time);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported time domain: " + timerKey.getDomain());
+ }
+ }
+
+ @Override
+ public Instant currentProcessingTime() {
+ return new Instant(timerService.currentProcessingTime());
+ }
+
+ @Nullable
+ @Override
+ public Instant currentSynchronizedProcessingTime() {
+ return new Instant(timerService.currentProcessingTime());
+ }
+
+ @Override
+ public Instant currentInputWatermarkTime() {
+ return new Instant(Math.min(currentInputWatermark, getPushbackWatermarkHold()));
+ }
+
+ @Nullable
+ @Override
+ public Instant currentOutputWatermarkTime() {
+ return new Instant(currentOutputWatermark);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
new file mode 100644
index 0000000..dce2e68
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import java.nio.ByteBuffer;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+/**
+ * {@link KeySelector} that retrieves a key from a {@link KV}. This will return
+ * the key as encoded by the provided {@link Coder} in a {@link ByteBuffer}. This ensures
+ * that all key comparisons/hashing happen on the encoded form.
+ */
+public class KvToByteBufferKeySelector<K, V>
+ implements KeySelector<WindowedValue<KV<K, V>>, ByteBuffer>,
+ ResultTypeQueryable<ByteBuffer> {
+
+ private final Coder<K> keyCoder;
+
+ public KvToByteBufferKeySelector(Coder<K> keyCoder) {
+ this.keyCoder = keyCoder;
+ }
+
+ @Override
+ public ByteBuffer getKey(WindowedValue<KV<K, V>> value) throws Exception {
+ K key = value.getValue().getKey();
+ byte[] keyBytes = CoderUtils.encodeToByteArray(keyCoder, key);
+ return ByteBuffer.wrap(keyBytes);
+ }
+
+ @Override
+ public TypeInformation<ByteBuffer> getProducedType() {
+ return new GenericTypeInfo<>(ByteBuffer.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
new file mode 100644
index 0000000..e843660
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import java.util.Collections;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * Singleton keyed word item.
+ */
+public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> {
+
+ final K key;
+ final WindowedValue<ElemT> value;
+
+ public SingletonKeyedWorkItem(K key, WindowedValue<ElemT> value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public K key() {
+ return key;
+ }
+
+ public WindowedValue<ElemT> value() {
+ return value;
+ }
+
+ @Override
+ public Iterable<TimerInternals.TimerData> timersIterable() {
+ return Collections.EMPTY_LIST;
+ }
+
+ @Override
+ public Iterable<WindowedValue<ElemT>> elementsIterable() {
+ return Collections.singletonList(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
new file mode 100644
index 0000000..9a52330
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItemCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * Singleton keyed work item coder.
+ */
+public class SingletonKeyedWorkItemCoder<K, ElemT>
+ extends StandardCoder<SingletonKeyedWorkItem<K, ElemT>> {
+ /**
+ * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window
+ * coder.
+ */
+ public static <K, ElemT> SingletonKeyedWorkItemCoder<K, ElemT> of(
+ Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) {
+ return new SingletonKeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder);
+ }
+
+ @JsonCreator
+ public static <K, ElemT> SingletonKeyedWorkItemCoder<K, ElemT> of(
+ @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
+ checkArgument(components.size() == 3, "Expecting 3 components, got %s", components.size());
+ @SuppressWarnings("unchecked")
+ Coder<K> keyCoder = (Coder<K>) components.get(0);
+ @SuppressWarnings("unchecked")
+ Coder<ElemT> elemCoder = (Coder<ElemT>) components.get(1);
+ @SuppressWarnings("unchecked")
+ Coder<? extends BoundedWindow> windowCoder = (Coder<? extends BoundedWindow>) components.get(2);
+ return new SingletonKeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder);
+ }
+
+ private final Coder<K> keyCoder;
+ private final Coder<ElemT> elemCoder;
+ private final Coder<? extends BoundedWindow> windowCoder;
+ private final WindowedValue.FullWindowedValueCoder<ElemT> valueCoder;
+
+ private SingletonKeyedWorkItemCoder(
+ Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) {
+ this.keyCoder = keyCoder;
+ this.elemCoder = elemCoder;
+ this.windowCoder = windowCoder;
+ valueCoder = WindowedValue.FullWindowedValueCoder.of(elemCoder, windowCoder);
+ }
+
+ public Coder<K> getKeyCoder() {
+ return keyCoder;
+ }
+
+ public Coder<ElemT> getElementCoder() {
+ return elemCoder;
+ }
+
+ @Override
+ public void encode(SingletonKeyedWorkItem<K, ElemT> value,
+ OutputStream outStream,
+ Context context)
+ throws CoderException, IOException {
+ keyCoder.encode(value.key(), outStream, context.nested());
+ valueCoder.encode(value.value, outStream, context);
+ }
+
+ @Override
+ public SingletonKeyedWorkItem<K, ElemT> decode(InputStream inStream, Context context)
+ throws CoderException, IOException {
+ K key = keyCoder.decode(inStream, context.nested());
+ WindowedValue<ElemT> value = valueCoder.decode(inStream, context);
+ return new SingletonKeyedWorkItem<>(key, value);
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return ImmutableList.of(keyCoder, elemCoder, windowCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ keyCoder.verifyDeterministic();
+ elemCoder.verifyDeterministic();
+ windowCoder.verifyDeterministic();
+ }
+
+ /**
+ * {@inheritDoc}.
+ *
+ * {@link KeyedWorkItemCoder} is not consistent with equals as it can return a
+ * {@link KeyedWorkItem} of a type different from the originally encoded type.
+ */
+ @Override
+ public boolean consistentWithEquals() {
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
new file mode 100644
index 0000000..40f70e4
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import org.apache.beam.runners.core.ElementAndRestriction;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
+import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Flink operator for executing splittable {@link DoFn DoFns}. Specifically, for executing
+ * the {@code @ProcessElement} method of a splittable {@link DoFn}.
+ */
+public class SplittableDoFnOperator<
+ InputT, FnOutputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
+ extends DoFnOperator<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT, OutputT> {
+
+ public SplittableDoFnOperator(
+ DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT> doFn,
+ Coder<
+ WindowedValue<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>> inputCoder,
+ TupleTag<FnOutputT> mainOutputTag,
+ List<TupleTag<?>> additionalOutputTags,
+ OutputManagerFactory<OutputT> outputManagerFactory,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Map<Integer, PCollectionView<?>> sideInputTagMapping,
+ Collection<PCollectionView<?>> sideInputs,
+ PipelineOptions options,
+ Coder<?> keyCoder) {
+ super(
+ doFn,
+ inputCoder,
+ mainOutputTag,
+ additionalOutputTags,
+ outputManagerFactory,
+ windowingStrategy,
+ sideInputTagMapping,
+ sideInputs,
+ options,
+ keyCoder);
+
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ checkState(doFn instanceof SplittableParDo.ProcessFn);
+
+ StateInternalsFactory<String> stateInternalsFactory = new StateInternalsFactory<String>() {
+ @Override
+ public StateInternals<String> stateInternalsForKey(String key) {
+ //this will implicitly be keyed by the key of the incoming
+ // element or by the key of a firing timer
+ return (StateInternals<String>) stateInternals;
+ }
+ };
+ TimerInternalsFactory<String> timerInternalsFactory = new TimerInternalsFactory<String>() {
+ @Override
+ public TimerInternals timerInternalsForKey(String key) {
+ //this will implicitly be keyed like the StateInternalsFactory
+ return timerInternals;
+ }
+ };
+
+ ((SplittableParDo.ProcessFn) doFn).setStateInternalsFactory(stateInternalsFactory);
+ ((SplittableParDo.ProcessFn) doFn).setTimerInternalsFactory(timerInternalsFactory);
+ ((SplittableParDo.ProcessFn) doFn).setProcessElementInvoker(
+ new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
+ doFn,
+ serializedOptions.getPipelineOptions(),
+ new OutputWindowedValue<FnOutputT>() {
+ @Override
+ public void outputWindowedValue(
+ FnOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ outputManager.output(
+ mainOutputTag,
+ WindowedValue.of(output, timestamp, windows, pane));
+ }
+
+ @Override
+ public <AdditionalOutputT> void outputWindowedValue(
+ TupleTag<AdditionalOutputT> tag,
+ AdditionalOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane));
+ }
+ },
+ sideInputReader,
+ Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
+ 10000,
+ Duration.standardSeconds(10)));
+ }
+
+ @Override
+ public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
+ doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
+ KeyedWorkItems.<String, ElementAndRestriction<InputT, RestrictionT>>timersWorkItem(
+ (String) stateInternals.getKey(),
+ Collections.singletonList(timer.getNamespace()))));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
new file mode 100644
index 0000000..9b2136c
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import static org.apache.beam.runners.core.TimerInternals.TimerData;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+
+/**
+ * Flink operator for executing window {@link DoFn DoFns}.
+ */
+public class WindowDoFnOperator<K, InputT, OutputT>
+ extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, WindowedValue<KV<K, OutputT>>> {
+
+ private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn;
+
+ public WindowDoFnOperator(
+ SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn,
+ Coder<WindowedValue<KeyedWorkItem<K, InputT>>> inputCoder,
+ TupleTag<KV<K, OutputT>> mainOutputTag,
+ List<TupleTag<?>> additionalOutputTags,
+ OutputManagerFactory<WindowedValue<KV<K, OutputT>>> outputManagerFactory,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Map<Integer, PCollectionView<?>> sideInputTagMapping,
+ Collection<PCollectionView<?>> sideInputs,
+ PipelineOptions options,
+ Coder<K> keyCoder) {
+ super(
+ null,
+ inputCoder,
+ mainOutputTag,
+ additionalOutputTags,
+ outputManagerFactory,
+ windowingStrategy,
+ sideInputTagMapping,
+ sideInputs,
+ options,
+ keyCoder);
+
+ this.systemReduceFn = systemReduceFn;
+
+ }
+
+ @Override
+ protected DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() {
+ StateInternalsFactory<K> stateInternalsFactory = new StateInternalsFactory<K>() {
+ @Override
+ public StateInternals<K> stateInternalsForKey(K key) {
+ //this will implicitly be keyed by the key of the incoming
+ // element or by the key of a firing timer
+ return (StateInternals<K>) stateInternals;
+ }
+ };
+ TimerInternalsFactory<K> timerInternalsFactory = new TimerInternalsFactory<K>() {
+ @Override
+ public TimerInternals timerInternalsForKey(K key) {
+ //this will implicitly be keyed like the StateInternalsFactory
+ return timerInternals;
+ }
+ };
+
+ // we have to do the unchecked cast because GroupAlsoByWindowViaWindowSetDoFn.create
+ // has the window type as generic parameter while WindowingStrategy is almost always
+ // untyped.
+ @SuppressWarnings("unchecked")
+ DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFn =
+ GroupAlsoByWindowViaWindowSetNewDoFn.create(
+ windowingStrategy, stateInternalsFactory, timerInternalsFactory, sideInputReader,
+ (SystemReduceFn) systemReduceFn, outputManager, mainOutputTag);
+ return doFn;
+ }
+
+ @Override
+ public void fireTimer(InternalTimer<?, TimerData> timer) {
+ doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
+ KeyedWorkItems.<K, InputT>timersWorkItem(
+ (K) stateInternals.getKey(),
+ Collections.singletonList(timer.getNamespace()))));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
new file mode 100644
index 0000000..1dff367
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import java.nio.ByteBuffer;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+/**
+ * {@link KeySelector} that retrieves a key from a {@link KeyedWorkItem}. This will return
+ * the key as encoded by the provided {@link Coder} in a {@link ByteBuffer}. This ensures
+ * that all key comparisons/hashing happen on the encoded form.
+ */
+public class WorkItemKeySelector<K, V>
+ implements KeySelector<WindowedValue<SingletonKeyedWorkItem<K, V>>, ByteBuffer>,
+ ResultTypeQueryable<ByteBuffer> {
+
+ private final Coder<K> keyCoder;
+
+ public WorkItemKeySelector(Coder<K> keyCoder) {
+ this.keyCoder = keyCoder;
+ }
+
+ @Override
+ public ByteBuffer getKey(WindowedValue<SingletonKeyedWorkItem<K, V>> value) throws Exception {
+ K key = value.getValue().key();
+ byte[] keyBytes = CoderUtils.encodeToByteArray(keyCoder, key);
+ return ByteBuffer.wrap(keyBytes);
+ }
+
+ @Override
+ public TypeInformation<ByteBuffer> getProducedType() {
+ return new GenericTypeInfo<>(ByteBuffer.class);
+ }
+}