You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/21 17:53:04 UTC
[20/50] [abbrv] beam git commit: [BEAM-1994] Remove Flink examples
package
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java
deleted file mode 100644
index af4b354..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink.translation;
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
deleted file mode 100644
index 9b449aa..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.AtomicType;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-/**
- * Flink {@link org.apache.flink.api.common.typeinfo.TypeInformation} for
- * Dataflow {@link org.apache.beam.sdk.coders.Coder}s.
- */
-public class CoderTypeInformation<T> extends TypeInformation<T> implements AtomicType<T> {
-
- private final Coder<T> coder;
-
- public CoderTypeInformation(Coder<T> coder) {
- checkNotNull(coder);
- this.coder = coder;
- }
-
- public Coder<T> getCoder() {
- return coder;
- }
-
- @Override
- public boolean isBasicType() {
- return false;
- }
-
- @Override
- public boolean isTupleType() {
- return false;
- }
-
- @Override
- public int getArity() {
- return 1;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public Class<T> getTypeClass() {
- // We don't have the Class, so we have to pass null here. What a shame...
- return (Class<T>) Object.class;
- }
-
- @Override
- public boolean isKeyType() {
- return true;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public TypeSerializer<T> createSerializer(ExecutionConfig config) {
- return new CoderTypeSerializer<>(coder);
- }
-
- @Override
- public int getTotalFields() {
- return 2;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- CoderTypeInformation that = (CoderTypeInformation) o;
-
- return coder.equals(that.coder);
-
- }
-
- @Override
- public int hashCode() {
- return coder.hashCode();
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof CoderTypeInformation;
- }
-
- @Override
- public String toString() {
- return "CoderTypeInformation{coder=" + coder + '}';
- }
-
- @Override
- public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig
- executionConfig) {
- throw new UnsupportedOperationException(
- "Non-encoded values cannot be compared directly.");
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
deleted file mode 100644
index e210ed9..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import java.io.EOFException;
-import java.io.IOException;
-import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
-import org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-/**
- * Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for
- * Dataflow {@link org.apache.beam.sdk.coders.Coder Coders}.
- */
-public class CoderTypeSerializer<T> extends TypeSerializer<T> {
-
- private Coder<T> coder;
-
- public CoderTypeSerializer(Coder<T> coder) {
- this.coder = coder;
- }
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public CoderTypeSerializer<T> duplicate() {
- return new CoderTypeSerializer<>(coder);
- }
-
- @Override
- public T createInstance() {
- return null;
- }
-
- @Override
- public T copy(T t) {
- try {
- return CoderUtils.clone(coder, t);
- } catch (CoderException e) {
- throw new RuntimeException("Could not clone.", e);
- }
- }
-
- @Override
- public T copy(T t, T reuse) {
- return copy(t);
- }
-
- @Override
- public int getLength() {
- return -1;
- }
-
- @Override
- public void serialize(T t, DataOutputView dataOutputView) throws IOException {
- DataOutputViewWrapper outputWrapper = new DataOutputViewWrapper(dataOutputView);
- coder.encode(t, outputWrapper, Coder.Context.NESTED);
- }
-
- @Override
- public T deserialize(DataInputView dataInputView) throws IOException {
- try {
- DataInputViewWrapper inputWrapper = new DataInputViewWrapper(dataInputView);
- return coder.decode(inputWrapper, Coder.Context.NESTED);
- } catch (CoderException e) {
- Throwable cause = e.getCause();
- if (cause instanceof EOFException) {
- throw (EOFException) cause;
- } else {
- throw e;
- }
- }
- }
-
- @Override
- public T deserialize(T t, DataInputView dataInputView) throws IOException {
- return deserialize(dataInputView);
- }
-
- @Override
- public void copy(
- DataInputView dataInputView,
- DataOutputView dataOutputView) throws IOException {
- serialize(deserialize(dataInputView), dataOutputView);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- CoderTypeSerializer that = (CoderTypeSerializer) o;
- return coder.equals(that.coder);
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof CoderTypeSerializer;
- }
-
- @Override
- public int hashCode() {
- return coder.hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java
deleted file mode 100644
index 667ef45..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import java.io.IOException;
-import java.util.Arrays;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-
-/**
- * Flink {@link org.apache.flink.api.common.typeutils.TypeComparator} for Beam values that have
- * been encoded to byte data by a {@link Coder}.
- */
-public class EncodedValueComparator extends TypeComparator<byte[]> {
-
- /** For storing the Reference in encoded form. */
- private transient byte[] encodedReferenceKey;
-
- private final boolean ascending;
-
- public EncodedValueComparator(boolean ascending) {
- this.ascending = ascending;
- }
-
- @Override
- public int hash(byte[] record) {
- return Arrays.hashCode(record);
- }
-
- @Override
- public void setReference(byte[] toCompare) {
- this.encodedReferenceKey = toCompare;
- }
-
- @Override
- public boolean equalToReference(byte[] candidate) {
- if (encodedReferenceKey.length != candidate.length) {
- return false;
- }
- int len = candidate.length;
- for (int i = 0; i < len; i++) {
- if (encodedReferenceKey[i] != candidate[i]) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public int compareToReference(TypeComparator<byte[]> other) {
- // VERY IMPORTANT: compareToReference does not behave like Comparable.compare
- // the meaning of the return value is inverted.
-
- EncodedValueComparator otherEncodedValueComparator = (EncodedValueComparator) other;
-
- int len = Math.min(
- encodedReferenceKey.length,
- otherEncodedValueComparator.encodedReferenceKey.length);
-
- for (int i = 0; i < len; i++) {
- byte b1 = encodedReferenceKey[i];
- byte b2 = otherEncodedValueComparator.encodedReferenceKey[i];
- int result = (b1 < b2 ? -1 : (b1 == b2 ? 0 : 1));
- if (result != 0) {
- return ascending ? -result : result;
- }
- }
- int result =
- encodedReferenceKey.length - otherEncodedValueComparator.encodedReferenceKey.length;
- return ascending ? -result : result;
- }
-
-
- @Override
- public int compare(byte[] first, byte[] second) {
- int len = Math.min(first.length, second.length);
- for (int i = 0; i < len; i++) {
- byte b1 = first[i];
- byte b2 = second[i];
- int result = (b1 < b2 ? -1 : (b1 == b2 ? 0 : 1));
- if (result != 0) {
- return ascending ? result : -result;
- }
- }
- int result = first.length - second.length;
- return ascending ? result : -result;
- }
-
- @Override
- public int compareSerialized(
- DataInputView firstSource,
- DataInputView secondSource) throws IOException {
- int lengthFirst = firstSource.readInt();
- int lengthSecond = secondSource.readInt();
-
- int len = Math.min(lengthFirst, lengthSecond);
- for (int i = 0; i < len; i++) {
- byte b1 = firstSource.readByte();
- byte b2 = secondSource.readByte();
- int result = (b1 < b2 ? -1 : (b1 == b2 ? 0 : 1));
- if (result != 0) {
- return ascending ? result : -result;
- }
- }
-
- int result = lengthFirst - lengthSecond;
- return ascending ? result : -result;
- }
-
-
-
- @Override
- public boolean supportsNormalizedKey() {
- // disabled because this seems to not work with some coders,
- // such as the AvroCoder
- return false;
- }
-
- @Override
- public boolean supportsSerializationWithKeyNormalization() {
- return false;
- }
-
- @Override
- public int getNormalizeKeyLen() {
- return Integer.MAX_VALUE;
- }
-
- @Override
- public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
- return true;
- }
-
- @Override
- public void putNormalizedKey(byte[] record, MemorySegment target, int offset, int numBytes) {
- final int limit = offset + numBytes;
-
- target.put(offset, record, 0, Math.min(numBytes, record.length));
-
- offset += record.length;
-
- while (offset < limit) {
- target.put(offset++, (byte) 0);
- }
- }
-
- @Override
- public void writeWithKeyNormalization(byte[] record, DataOutputView target) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public byte[] readWithKeyDenormalization(byte[] reuse, DataInputView source) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean invertNormalizedKey() {
- return !ascending;
- }
-
- @Override
- public TypeComparator<byte[]> duplicate() {
- return new EncodedValueComparator(ascending);
- }
-
- @Override
- public int extractKeys(Object record, Object[] target, int index) {
- target[index] = record;
- return 1;
- }
-
- @Override
- public TypeComparator[] getFlatComparators() {
- return new TypeComparator[] { this.duplicate() };
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
deleted file mode 100644
index 41db61e..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import java.io.IOException;
-
-import org.apache.beam.sdk.coders.Coder;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-/**
- * {@link TypeSerializer} for values that were encoded using a {@link Coder}.
- */
-public final class EncodedValueSerializer extends TypeSerializer<byte[]> {
-
- private static final long serialVersionUID = 1L;
-
- private static final byte[] EMPTY = new byte[0];
-
- @Override
- public boolean isImmutableType() {
- return true;
- }
-
- @Override
- public byte[] createInstance() {
- return EMPTY;
- }
-
- @Override
- public byte[] copy(byte[] from) {
- return from;
- }
-
- @Override
- public byte[] copy(byte[] from, byte[] reuse) {
- return copy(from);
- }
-
- @Override
- public int getLength() {
- return -1;
- }
-
-
- @Override
- public void serialize(byte[] record, DataOutputView target) throws IOException {
- if (record == null) {
- throw new IllegalArgumentException("The record must not be null.");
- }
-
- final int len = record.length;
- target.writeInt(len);
- target.write(record);
- }
-
- @Override
- public byte[] deserialize(DataInputView source) throws IOException {
- final int len = source.readInt();
- byte[] result = new byte[len];
- source.readFully(result);
- return result;
- }
-
- @Override
- public byte[] deserialize(byte[] reuse, DataInputView source) throws IOException {
- return deserialize(source);
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- final int len = source.readInt();
- target.writeInt(len);
- target.write(source, len);
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof EncodedValueSerializer;
- }
-
- @Override
- public int hashCode() {
- return this.getClass().hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj instanceof EncodedValueSerializer;
- }
-
- @Override
- public TypeSerializer<byte[]> duplicate() {
- return this;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java
deleted file mode 100644
index e24bf31..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.AtomicType;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-/**
- * Flink {@link TypeInformation} for Beam values that have been encoded to byte data
- * by a {@link Coder}.
- */
-public class EncodedValueTypeInformation
- extends TypeInformation<byte[]>
- implements AtomicType<byte[]> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean isBasicType() {
- return false;
- }
-
- @Override
- public boolean isTupleType() {
- return false;
- }
-
- @Override
- public int getArity() {
- return 0;
- }
-
- @Override
- public int getTotalFields() {
- return 0;
- }
-
- @Override
- public Class<byte[]> getTypeClass() {
- return byte[].class;
- }
-
- @Override
- public boolean isKeyType() {
- return true;
- }
-
- @Override
- public TypeSerializer<byte[]> createSerializer(ExecutionConfig executionConfig) {
- return new EncodedValueSerializer();
- }
-
- @Override
- public boolean equals(Object other) {
- return other instanceof EncodedValueTypeInformation;
- }
-
- @Override
- public int hashCode() {
- return this.getClass().hashCode();
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof EncodedValueTypeInformation;
- }
-
- @Override
- public String toString() {
- return "EncodedValueTypeInformation";
- }
-
- @Override
- public TypeComparator<byte[]> createComparator(
- boolean sortOrderAscending,
- ExecutionConfig executionConfig) {
- return new EncodedValueComparator(sortOrderAscending);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java
deleted file mode 100644
index 36b5ba3..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import java.io.ByteArrayOutputStream;
-
-/**
- * Version of {@link java.io.ByteArrayOutputStream} that allows to retrieve the internal
- * byte[] buffer without incurring an array copy.
- */
-public class InspectableByteArrayOutputStream extends ByteArrayOutputStream {
-
- /**
- * Get the underlying byte array.
- */
- public byte[] getBuffer() {
- return buf;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java
deleted file mode 100644
index 9df6836..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-
-/**
- * {@link KeySelector} that extracts the key from a {@link KV} and returns
- * it in encoded form as a {@code byte} array.
- */
-public class KvKeySelector<InputT, K>
- implements KeySelector<WindowedValue<KV<K, InputT>>, byte[]>, ResultTypeQueryable<byte[]> {
-
- private final Coder<K> keyCoder;
-
- public KvKeySelector(Coder<K> keyCoder) {
- this.keyCoder = keyCoder;
- }
-
- @Override
- public byte[] getKey(WindowedValue<KV<K, InputT>> value) throws Exception {
- return CoderUtils.encodeToByteArray(keyCoder, value.getValue().getKey());
- }
-
- @Override
- public TypeInformation<byte[]> getProducedType() {
- return new EncodedValueTypeInformation();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java
deleted file mode 100644
index 6fb3182..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink.translation.types;
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
deleted file mode 100644
index 2256bb1..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.flink.translation.utils;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.beam.sdk.io.FileSystems;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.IOChannelUtils;
-
-/**
- * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
- */
-public class SerializedPipelineOptions implements Serializable {
-
- private final byte[] serializedOptions;
-
- /** Lazily initialized copy of deserialized options. */
- private transient PipelineOptions pipelineOptions;
-
- public SerializedPipelineOptions(PipelineOptions options) {
- checkNotNull(options, "PipelineOptions must not be null.");
-
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- new ObjectMapper().writeValue(baos, options);
- this.serializedOptions = baos.toByteArray();
- } catch (Exception e) {
- throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
- }
-
- }
-
- public PipelineOptions getPipelineOptions() {
- if (pipelineOptions == null) {
- try {
- pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
-
- IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
- FileSystems.setDefaultConfigInWorkers(pipelineOptions);
- } catch (IOException e) {
- throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
- }
- }
-
- return pipelineOptions;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java
deleted file mode 100644
index 5dedd53..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink.translation.utils;
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java
deleted file mode 100644
index 82a2c4e..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import org.apache.flink.core.memory.DataInputView;
-
-/**
- * Wrapper for {@link DataInputView}. We need this because Flink reads data using a
- * {@link org.apache.flink.core.memory.DataInputView} while
- * Dataflow {@link org.apache.beam.sdk.coders.Coder}s expect an
- * {@link java.io.InputStream}.
- */
-public class DataInputViewWrapper extends InputStream {
-
- private DataInputView inputView;
-
- public DataInputViewWrapper(DataInputView inputView) {
- this.inputView = inputView;
- }
-
- public void setInputView(DataInputView inputView) {
- this.inputView = inputView;
- }
-
- @Override
- public int read() throws IOException {
- try {
- return inputView.readUnsignedByte();
- } catch (EOFException e) {
- // translate between DataInput and InputStream,
- // DataInput signals EOF by exception, InputStream does it by returning -1
- return -1;
- }
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- return inputView.read(b, off, len);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
deleted file mode 100644
index f2d9db2..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import org.apache.flink.core.memory.DataOutputView;
-
-/**
- * Wrapper for {@link org.apache.flink.core.memory.DataOutputView}. We need this because
- * Flink writes data using a {@link org.apache.flink.core.memory.DataInputView} while
- * Dataflow {@link org.apache.beam.sdk.coders.Coder}s expect an
- * {@link java.io.OutputStream}.
- */
-public class DataOutputViewWrapper extends OutputStream {
-
- private DataOutputView outputView;
-
- public DataOutputViewWrapper(DataOutputView outputView) {
- this.outputView = outputView;
- }
-
- public void setOutputView(DataOutputView outputView) {
- this.outputView = outputView;
- }
-
- @Override
- public void write(int b) throws IOException {
- outputView.write(b);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- outputView.write(b, off, len);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
deleted file mode 100644
index 70d97e3..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.io.Serializable;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.flink.api.common.accumulators.Accumulator;
-
-/**
- * Wrapper that wraps a {@link org.apache.beam.sdk.transforms.Combine.CombineFn}
- * in a Flink {@link org.apache.flink.api.common.accumulators.Accumulator} for using
- * the function as an aggregator in a {@link org.apache.beam.sdk.transforms.ParDo}
- * operation.
- */
-public class SerializableFnAggregatorWrapper<InputT, OutputT>
- implements Aggregator<InputT, OutputT>, Accumulator<InputT, Serializable> {
-
- private OutputT aa;
- private Combine.CombineFn<InputT, ?, OutputT> combiner;
-
- public SerializableFnAggregatorWrapper(Combine.CombineFn<InputT, ?, OutputT> combiner) {
- this.combiner = combiner;
- resetLocal();
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void add(InputT value) {
- this.aa = combiner.apply(ImmutableList.of((InputT) aa, value));
- }
-
- @Override
- public Serializable getLocalValue() {
- return (Serializable) aa;
- }
-
- @Override
- public void resetLocal() {
- this.aa = combiner.apply(ImmutableList.<InputT>of());
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void merge(Accumulator<InputT, Serializable> other) {
- this.aa = combiner.apply(ImmutableList.of((InputT) aa, (InputT) other.getLocalValue()));
- }
-
- @Override
- public void addValue(InputT value) {
- add(value);
- }
-
- @Override
- public String getName() {
- return "Aggregator :" + combiner.toString();
- }
-
- @Override
- public Combine.CombineFn<InputT, ?, OutputT> getCombineFn() {
- return combiner;
- }
-
- @Override
- public Accumulator<InputT, Serializable> clone() {
- try {
- super.clone();
- } catch (CloneNotSupportedException e) {
- // Flink Accumulators cannot throw CloneNotSupportedException, work around that.
- throw new RuntimeException(e);
- }
-
- // copy it by merging
- OutputT resultCopy = combiner.apply(Lists.newArrayList((InputT) aa));
- SerializableFnAggregatorWrapper<InputT, OutputT> result = new
- SerializableFnAggregatorWrapper<>(combiner);
-
- result.aa = resultCopy;
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
deleted file mode 100644
index a87472b..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Source;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Wrapper for executing a {@link Source} as a Flink {@link InputFormat}.
- */
-public class SourceInputFormat<T>
- implements InputFormat<WindowedValue<T>, SourceInputSplit<T>> {
- private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class);
-
- private final BoundedSource<T> initialSource;
-
- private transient PipelineOptions options;
- private final SerializedPipelineOptions serializedOptions;
-
- private transient BoundedSource.BoundedReader<T> reader;
- private boolean inputAvailable = false;
-
- public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) {
- this.initialSource = initialSource;
- this.serializedOptions = new SerializedPipelineOptions(options);
- }
-
- @Override
- public void configure(Configuration configuration) {
- options = serializedOptions.getPipelineOptions();
- }
-
- @Override
- public void open(SourceInputSplit<T> sourceInputSplit) throws IOException {
- reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options);
- inputAvailable = reader.start();
- }
-
- @Override
- public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException {
- try {
- final long estimatedSize = initialSource.getEstimatedSizeBytes(options);
-
- return new BaseStatistics() {
- @Override
- public long getTotalInputSize() {
- return estimatedSize;
- }
-
- @Override
- public long getNumberOfRecords() {
- return BaseStatistics.NUM_RECORDS_UNKNOWN;
- }
-
- @Override
- public float getAverageRecordWidth() {
- return BaseStatistics.AVG_RECORD_BYTES_UNKNOWN;
- }
- };
- } catch (Exception e) {
- LOG.warn("Could not read Source statistics: {}", e);
- }
-
- return null;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException {
- try {
- long desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits;
- List<? extends Source<T>> shards =
- initialSource.split(desiredSizeBytes, options);
- int numShards = shards.size();
- SourceInputSplit<T>[] sourceInputSplits = new SourceInputSplit[numShards];
- for (int i = 0; i < numShards; i++) {
- sourceInputSplits[i] = new SourceInputSplit<>(shards.get(i), i);
- }
- return sourceInputSplits;
- } catch (Exception e) {
- throw new IOException("Could not create input splits from Source.", e);
- }
- }
-
- @Override
- public InputSplitAssigner getInputSplitAssigner(final SourceInputSplit[] sourceInputSplits) {
- return new DefaultInputSplitAssigner(sourceInputSplits);
- }
-
-
- @Override
- public boolean reachedEnd() throws IOException {
- return !inputAvailable;
- }
-
- @Override
- public WindowedValue<T> nextRecord(WindowedValue<T> t) throws IOException {
- if (inputAvailable) {
- final T current = reader.getCurrent();
- final Instant timestamp = reader.getCurrentTimestamp();
- // advance reader to have a record ready next time
- inputAvailable = reader.advance();
- return WindowedValue.of(
- current,
- timestamp,
- GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
- }
-
- return null;
- }
-
- @Override
- public void close() throws IOException {
- // TODO null check can be removed once FLINK-3796 is fixed
- if (reader != null) {
- reader.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java
deleted file mode 100644
index e4a7386..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import org.apache.beam.sdk.io.Source;
-import org.apache.flink.core.io.InputSplit;
-
-/**
- * {@link org.apache.flink.core.io.InputSplit} for
- * {@link org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat}. We pass
- * the sharded Source around in the input split because Sources simply split up into several
- * Sources for sharding. This is different to how Flink creates a separate InputSplit from
- * an InputFormat.
- */
-public class SourceInputSplit<T> implements InputSplit {
-
- private Source<T> source;
- private int splitNumber;
-
- public SourceInputSplit() {
- }
-
- public SourceInputSplit(Source<T> source, int splitNumber) {
- this.source = source;
- this.splitNumber = splitNumber;
- }
-
- @Override
- public int getSplitNumber() {
- return splitNumber;
- }
-
- public Source<T> getSource() {
- return source;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java
deleted file mode 100644
index 72f7deb..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink.translation.wrappers;
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
deleted file mode 100644
index 8a09286..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ /dev/null
@@ -1,774 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
-import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
-import org.apache.beam.runners.core.SideInputHandler;
-import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaces;
-import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.core.StatefulDoFnRunner;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkSplitStateInternals;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.join.RawUnionValue;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.NullSideInputReader;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
-import org.apache.flink.runtime.state.KeyGroupsList;
-import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.HeapInternalTimerService;
-import org.apache.flink.streaming.api.operators.InternalTimer;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.Triggerable;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.joda.time.Instant;
-
-/**
- * Flink operator for executing {@link DoFn DoFns}.
- *
- * @param <InputT> the input type of the {@link DoFn}
- * @param <FnOutputT> the output type of the {@link DoFn}
- * @param <OutputT> the output type of the operator, this can be different from the fn output
- * type when we have additional tagged outputs
- */
-public class DoFnOperator<InputT, FnOutputT, OutputT>
- extends AbstractStreamOperator<OutputT>
- implements OneInputStreamOperator<WindowedValue<InputT>, OutputT>,
- TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, OutputT>,
- KeyGroupCheckpointedOperator, Triggerable<Object, TimerData> {
-
- protected DoFn<InputT, FnOutputT> doFn;
-
- protected final SerializedPipelineOptions serializedOptions;
-
- protected final TupleTag<FnOutputT> mainOutputTag;
- protected final List<TupleTag<?>> additionalOutputTags;
-
- protected final Collection<PCollectionView<?>> sideInputs;
- protected final Map<Integer, PCollectionView<?>> sideInputTagMapping;
-
- protected final WindowingStrategy<?, ?> windowingStrategy;
-
- protected final OutputManagerFactory<OutputT> outputManagerFactory;
-
- protected transient DoFnRunner<InputT, FnOutputT> doFnRunner;
- protected transient PushbackSideInputDoFnRunner<InputT, FnOutputT> pushbackDoFnRunner;
-
- protected transient SideInputHandler sideInputHandler;
-
- protected transient SideInputReader sideInputReader;
-
- protected transient DoFnRunners.OutputManager outputManager;
-
- private transient DoFnInvoker<InputT, FnOutputT> doFnInvoker;
-
- protected transient long currentInputWatermark;
-
- protected transient long currentOutputWatermark;
-
- private transient StateTag<Object, BagState<WindowedValue<InputT>>> pushedBackTag;
-
- protected transient FlinkStateInternals<?> stateInternals;
-
- private Coder<WindowedValue<InputT>> inputCoder;
-
- private final Coder<?> keyCoder;
-
- private final TimerInternals.TimerDataCoder timerCoder;
-
- protected transient HeapInternalTimerService<?, TimerInternals.TimerData> timerService;
-
- protected transient FlinkTimerInternals timerInternals;
-
- private transient StateInternals<?> pushbackStateInternals;
-
- private transient Optional<Long> pushedBackWatermark;
-
- public DoFnOperator(
- DoFn<InputT, FnOutputT> doFn,
- Coder<WindowedValue<InputT>> inputCoder,
- TupleTag<FnOutputT> mainOutputTag,
- List<TupleTag<?>> additionalOutputTags,
- OutputManagerFactory<OutputT> outputManagerFactory,
- WindowingStrategy<?, ?> windowingStrategy,
- Map<Integer, PCollectionView<?>> sideInputTagMapping,
- Collection<PCollectionView<?>> sideInputs,
- PipelineOptions options,
- Coder<?> keyCoder) {
- this.doFn = doFn;
- this.inputCoder = inputCoder;
- this.mainOutputTag = mainOutputTag;
- this.additionalOutputTags = additionalOutputTags;
- this.sideInputTagMapping = sideInputTagMapping;
- this.sideInputs = sideInputs;
- this.serializedOptions = new SerializedPipelineOptions(options);
- this.windowingStrategy = windowingStrategy;
- this.outputManagerFactory = outputManagerFactory;
-
- setChainingStrategy(ChainingStrategy.ALWAYS);
-
- this.keyCoder = keyCoder;
-
- this.timerCoder =
- TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
- }
-
- private ExecutionContext.StepContext createStepContext() {
- return new StepContext();
- }
-
- // allow overriding this in WindowDoFnOperator because this one dynamically creates
- // the DoFn
- protected DoFn<InputT, FnOutputT> getDoFn() {
- return doFn;
- }
-
- @Override
- public void open() throws Exception {
- super.open();
-
- currentInputWatermark = Long.MIN_VALUE;
- currentOutputWatermark = Long.MIN_VALUE;
-
- AggregatorFactory aggregatorFactory = new AggregatorFactory() {
- @Override
- public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
- Class<?> fnClass,
- ExecutionContext.StepContext stepContext,
- String aggregatorName,
- Combine.CombineFn<InputT, AccumT, OutputT> combine) {
-
- @SuppressWarnings("unchecked")
- SerializableFnAggregatorWrapper<InputT, OutputT> result =
- (SerializableFnAggregatorWrapper<InputT, OutputT>)
- getRuntimeContext().getAccumulator(aggregatorName);
-
- if (result == null) {
- result = new SerializableFnAggregatorWrapper<>(combine);
- getRuntimeContext().addAccumulator(aggregatorName, result);
- }
- return result;
- }
- };
-
- sideInputReader = NullSideInputReader.of(sideInputs);
-
- if (!sideInputs.isEmpty()) {
-
- pushedBackTag = StateTags.bag("pushed-back-values", inputCoder);
-
- FlinkBroadcastStateInternals sideInputStateInternals =
- new FlinkBroadcastStateInternals<>(
- getContainingTask().getIndexInSubtaskGroup(), getOperatorStateBackend());
-
- sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals);
- sideInputReader = sideInputHandler;
-
- // maybe init by initializeState
- if (pushbackStateInternals == null) {
- if (keyCoder != null) {
- pushbackStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder,
- getKeyedStateBackend());
- } else {
- pushbackStateInternals =
- new FlinkSplitStateInternals<Object>(getOperatorStateBackend());
- }
- }
-
- pushedBackWatermark = Optional.absent();
-
- }
-
- outputManager = outputManagerFactory.create(output);
-
- // StatefulPardo or WindowDoFn
- if (keyCoder != null) {
- stateInternals = new FlinkStateInternals<>((KeyedStateBackend) getKeyedStateBackend(),
- keyCoder);
-
- timerService = (HeapInternalTimerService<?, TimerInternals.TimerData>)
- getInternalTimerService("beam-timer", new CoderTypeSerializer<>(timerCoder), this);
-
- timerInternals = new FlinkTimerInternals();
-
- }
-
- // WindowDoFnOperator need use state and timer to get DoFn.
- // So must wait StateInternals and TimerInternals ready.
- this.doFn = getDoFn();
- doFnInvoker = DoFnInvokers.invokerFor(doFn);
-
- doFnInvoker.invokeSetup();
-
- ExecutionContext.StepContext stepContext = createStepContext();
-
- doFnRunner = DoFnRunners.simpleRunner(
- serializedOptions.getPipelineOptions(),
- doFn,
- sideInputReader,
- outputManager,
- mainOutputTag,
- additionalOutputTags,
- stepContext,
- aggregatorFactory,
- windowingStrategy);
-
- if (doFn instanceof GroupAlsoByWindowViaWindowSetNewDoFn) {
- // When the doFn is this, we know it came from WindowDoFnOperator and
- // InputT = KeyedWorkItem<K, V>
- // OutputT = KV<K, V>
- //
- // for some K, V
-
-
- doFnRunner = DoFnRunners.lateDataDroppingRunner(
- (DoFnRunner) doFnRunner,
- stepContext,
- windowingStrategy,
- ((GroupAlsoByWindowViaWindowSetNewDoFn) doFn).getDroppedDueToLatenessAggregator());
- } else if (keyCoder != null) {
- // It is a stateful DoFn
-
- StatefulDoFnRunner.CleanupTimer cleanupTimer =
- new StatefulDoFnRunner.TimeInternalsCleanupTimer(
- stepContext.timerInternals(), windowingStrategy);
-
- // we don't know the window type
- @SuppressWarnings({"unchecked", "rawtypes"})
- Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- StatefulDoFnRunner.StateCleaner<?> stateCleaner =
- new StatefulDoFnRunner.StateInternalsStateCleaner<>(
- doFn, stepContext.stateInternals(), windowCoder);
-
- doFnRunner = DoFnRunners.defaultStatefulDoFnRunner(
- doFn,
- doFnRunner,
- stepContext,
- aggregatorFactory,
- windowingStrategy,
- cleanupTimer,
- stateCleaner);
- }
-
- pushbackDoFnRunner =
- SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- doFnInvoker.invokeTeardown();
- }
-
- protected final long getPushbackWatermarkHold() {
- // if we don't have side inputs we never hold the watermark
- if (sideInputs.isEmpty()) {
- return BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
- }
-
- try {
- checkInitPushedBackWatermark();
- return pushedBackWatermark.get();
- } catch (Exception e) {
- throw new RuntimeException("Error retrieving pushed back watermark state.", e);
- }
- }
-
- private void checkInitPushedBackWatermark() {
- // init and restore from pushedBack state.
- // Not done in initializeState, because OperatorState is not ready.
- if (!pushedBackWatermark.isPresent()) {
-
- BagState<WindowedValue<InputT>> pushedBack =
- pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
-
- long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
- for (WindowedValue<InputT> value : pushedBack.read()) {
- min = Math.min(min, value.getTimestamp().getMillis());
- }
- setPushedBackWatermark(min);
- }
- }
-
- @Override
- public final void processElement(
- StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
- doFnRunner.startBundle();
- doFnRunner.processElement(streamRecord.getValue());
- doFnRunner.finishBundle();
- }
-
- private void setPushedBackWatermark(long watermark) {
- pushedBackWatermark = Optional.fromNullable(watermark);
- }
-
- @Override
- public final void processElement1(
- StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
- pushbackDoFnRunner.startBundle();
- Iterable<WindowedValue<InputT>> justPushedBack =
- pushbackDoFnRunner.processElementInReadyWindows(streamRecord.getValue());
-
- BagState<WindowedValue<InputT>> pushedBack =
- pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
-
- checkInitPushedBackWatermark();
-
- long min = pushedBackWatermark.get();
- for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
- min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
- pushedBack.add(pushedBackValue);
- }
- setPushedBackWatermark(min);
- pushbackDoFnRunner.finishBundle();
- }
-
- @Override
- public final void processElement2(
- StreamRecord<RawUnionValue> streamRecord) throws Exception {
- pushbackDoFnRunner.startBundle();
-
- @SuppressWarnings("unchecked")
- WindowedValue<Iterable<?>> value =
- (WindowedValue<Iterable<?>>) streamRecord.getValue().getValue();
-
- PCollectionView<?> sideInput = sideInputTagMapping.get(streamRecord.getValue().getUnionTag());
- sideInputHandler.addSideInputValue(sideInput, value);
-
- BagState<WindowedValue<InputT>> pushedBack =
- pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
-
- List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
-
- Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
- if (pushedBackContents != null) {
- for (WindowedValue<InputT> elem : pushedBackContents) {
-
- // we need to set the correct key in case the operator is
- // a (keyed) window operator
- setKeyContextElement1(new StreamRecord<>(elem));
-
- Iterable<WindowedValue<InputT>> justPushedBack =
- pushbackDoFnRunner.processElementInReadyWindows(elem);
- Iterables.addAll(newPushedBack, justPushedBack);
- }
- }
-
- pushedBack.clear();
- long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
- for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
- min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
- pushedBack.add(pushedBackValue);
- }
- setPushedBackWatermark(min);
-
- pushbackDoFnRunner.finishBundle();
-
- // maybe output a new watermark
- processWatermark1(new Watermark(currentInputWatermark));
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- processWatermark1(mark);
- }
-
- @Override
- public void processWatermark1(Watermark mark) throws Exception {
- if (keyCoder == null) {
- this.currentInputWatermark = mark.getTimestamp();
- long potentialOutputWatermark =
- Math.min(getPushbackWatermarkHold(), currentInputWatermark);
- if (potentialOutputWatermark > currentOutputWatermark) {
- currentOutputWatermark = potentialOutputWatermark;
- output.emitWatermark(new Watermark(currentOutputWatermark));
- }
- } else {
- // fireTimers, so we need startBundle.
- pushbackDoFnRunner.startBundle();
-
- this.currentInputWatermark = mark.getTimestamp();
-
- // hold back by the pushed back values waiting for side inputs
- long actualInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp());
-
- timerService.advanceWatermark(actualInputWatermark);
-
- Instant watermarkHold = stateInternals.watermarkHold();
-
- long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold());
-
- long potentialOutputWatermark = Math.min(currentInputWatermark, combinedWatermarkHold);
-
- if (potentialOutputWatermark > currentOutputWatermark) {
- currentOutputWatermark = potentialOutputWatermark;
- output.emitWatermark(new Watermark(currentOutputWatermark));
- }
- pushbackDoFnRunner.finishBundle();
- }
- }
-
- @Override
- public void processWatermark2(Watermark mark) throws Exception {
- // ignore watermarks from the side-input input
- }
-
- @Override
- public void snapshotState(StateSnapshotContext context) throws Exception {
- // copy from AbstractStreamOperator
- if (getKeyedStateBackend() != null) {
- KeyedStateCheckpointOutputStream out;
-
- try {
- out = context.getRawKeyedOperatorStateOutput();
- } catch (Exception exception) {
- throw new Exception("Could not open raw keyed operator state stream for "
- + getOperatorName() + '.', exception);
- }
-
- try {
- KeyGroupsList allKeyGroups = out.getKeyGroupList();
- for (int keyGroupIdx : allKeyGroups) {
- out.startNewKeyGroup(keyGroupIdx);
-
- DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out);
-
- // if (this instanceof KeyGroupCheckpointedOperator)
- snapshotKeyGroupState(keyGroupIdx, dov);
-
- // We can't get all timerServices, so we just snapshot our timerService
- // Maybe this is a normal DoFn that has no timerService
- if (keyCoder != null) {
- timerService.snapshotTimersForKeyGroup(dov, keyGroupIdx);
- }
-
- }
- } catch (Exception exception) {
- throw new Exception("Could not write timer service of " + getOperatorName()
- + " to checkpoint state stream.", exception);
- } finally {
- try {
- out.close();
- } catch (Exception closeException) {
- LOG.warn("Could not close raw keyed operator state stream for {}. This "
- + "might have prevented deleting some state data.", getOperatorName(),
- closeException);
- }
- }
- }
- }
-
- @Override
- public void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) throws Exception {
- if (!sideInputs.isEmpty() && keyCoder != null) {
- ((FlinkKeyGroupStateInternals) pushbackStateInternals).snapshotKeyGroupState(
- keyGroupIndex, out);
- }
- }
-
- @Override
- public void initializeState(StateInitializationContext context) throws Exception {
- if (getKeyedStateBackend() != null) {
- int totalKeyGroups = getKeyedStateBackend().getNumberOfKeyGroups();
- KeyGroupsList localKeyGroupRange = getKeyedStateBackend().getKeyGroupRange();
-
- for (KeyGroupStatePartitionStreamProvider streamProvider : context.getRawKeyedStateInputs()) {
- DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(streamProvider.getStream());
-
- int keyGroupIdx = streamProvider.getKeyGroupId();
- checkArgument(localKeyGroupRange.contains(keyGroupIdx),
- "Key Group " + keyGroupIdx + " does not belong to the local range.");
-
- // if (this instanceof KeyGroupRestoringOperator)
- restoreKeyGroupState(keyGroupIdx, div);
-
- // We just initialize our timerService
- if (keyCoder != null) {
- if (timerService == null) {
- timerService = new HeapInternalTimerService<>(
- totalKeyGroups,
- localKeyGroupRange,
- this,
- getRuntimeContext().getProcessingTimeService());
- }
- timerService.restoreTimersForKeyGroup(div, keyGroupIdx, getUserCodeClassloader());
- }
- }
- }
- }
-
- @Override
- public void restoreKeyGroupState(int keyGroupIndex, DataInputStream in) throws Exception {
- if (!sideInputs.isEmpty() && keyCoder != null) {
- if (pushbackStateInternals == null) {
- pushbackStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder,
- getKeyedStateBackend());
- }
- ((FlinkKeyGroupStateInternals) pushbackStateInternals)
- .restoreKeyGroupState(keyGroupIndex, in, getUserCodeClassloader());
- }
- }
-
- @Override
- public void onEventTime(InternalTimer<Object, TimerData> timer) throws Exception {
- fireTimer(timer);
- }
-
- @Override
- public void onProcessingTime(InternalTimer<Object, TimerData> timer) throws Exception {
- fireTimer(timer);
- }
-
- // allow overriding this in WindowDoFnOperator
- public void fireTimer(InternalTimer<?, TimerData> timer) {
- TimerInternals.TimerData timerData = timer.getNamespace();
- StateNamespace namespace = timerData.getNamespace();
- // This is a user timer, so namespace must be WindowNamespace
- checkArgument(namespace instanceof WindowNamespace);
- BoundedWindow window = ((WindowNamespace) namespace).getWindow();
- pushbackDoFnRunner.onTimer(timerData.getTimerId(), window,
- timerData.getTimestamp(), timerData.getDomain());
- }
-
- /**
- * Factory for creating an {@link DoFnRunners.OutputManager} from
- * a Flink {@link Output}.
- */
- interface OutputManagerFactory<OutputT> extends Serializable {
- DoFnRunners.OutputManager create(Output<StreamRecord<OutputT>> output);
- }
-
- /**
- * Default implementation of {@link OutputManagerFactory} that creates an
- * {@link DoFnRunners.OutputManager} that only writes to
- * a single logical output.
- */
- public static class DefaultOutputManagerFactory<OutputT>
- implements OutputManagerFactory<OutputT> {
- @Override
- public DoFnRunners.OutputManager create(final Output<StreamRecord<OutputT>> output) {
- return new DoFnRunners.OutputManager() {
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
- // with tagged outputs we can't get around this because we don't
- // know our own output type...
- @SuppressWarnings("unchecked")
- OutputT castValue = (OutputT) value;
- output.collect(new StreamRecord<>(castValue));
- }
- };
- }
- }
-
- /**
- * Implementation of {@link OutputManagerFactory} that creates an
- * {@link DoFnRunners.OutputManager} that can write to multiple logical
- * outputs by unioning them in a {@link RawUnionValue}.
- */
- public static class MultiOutputOutputManagerFactory
- implements OutputManagerFactory<RawUnionValue> {
-
- Map<TupleTag<?>, Integer> mapping;
-
- public MultiOutputOutputManagerFactory(Map<TupleTag<?>, Integer> mapping) {
- this.mapping = mapping;
- }
-
- @Override
- public DoFnRunners.OutputManager create(final Output<StreamRecord<RawUnionValue>> output) {
- return new DoFnRunners.OutputManager() {
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
- int intTag = mapping.get(tag);
- output.collect(new StreamRecord<>(new RawUnionValue(intTag, value)));
- }
- };
- }
- }
-
- /**
- * {@link StepContext} for running {@link DoFn DoFns} on Flink. This does not allow
- * accessing state or timer internals.
- */
- protected class StepContext implements ExecutionContext.StepContext {
-
- @Override
- public String getStepName() {
- return null;
- }
-
- @Override
- public String getTransformName() {
- return null;
- }
-
- @Override
- public void noteOutput(WindowedValue<?> output) {}
-
- @Override
- public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {}
-
- @Override
- public <T, W extends BoundedWindow> void writePCollectionViewData(
- TupleTag<?> tag,
- Iterable<WindowedValue<T>> data,
- Coder<Iterable<WindowedValue<T>>> dataCoder,
- W window,
- Coder<W> windowCoder) throws IOException {
- throw new UnsupportedOperationException("Writing side-input data is not supported.");
- }
-
- @Override
- public StateInternals<?> stateInternals() {
- return stateInternals;
- }
-
- @Override
- public TimerInternals timerInternals() {
- return timerInternals;
- }
- }
-
- private class FlinkTimerInternals implements TimerInternals {
-
- @Override
- public void setTimer(
- StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
- setTimer(TimerData.of(timerId, namespace, target, timeDomain));
- }
-
- @Deprecated
- @Override
- public void setTimer(TimerData timerKey) {
- long time = timerKey.getTimestamp().getMillis();
- if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
- timerService.registerEventTimeTimer(timerKey, time);
- } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
- timerService.registerProcessingTimeTimer(timerKey, time);
- } else {
- throw new UnsupportedOperationException(
- "Unsupported time domain: " + timerKey.getDomain());
- }
- }
-
- @Deprecated
- @Override
- public void deleteTimer(StateNamespace namespace, String timerId) {
- throw new UnsupportedOperationException(
- "Canceling of a timer by ID is not yet supported.");
- }
-
- @Override
- public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
- throw new UnsupportedOperationException(
- "Canceling of a timer by ID is not yet supported.");
- }
-
- @Deprecated
- @Override
- public void deleteTimer(TimerData timerKey) {
- long time = timerKey.getTimestamp().getMillis();
- if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
- timerService.deleteEventTimeTimer(timerKey, time);
- } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
- timerService.deleteProcessingTimeTimer(timerKey, time);
- } else {
- throw new UnsupportedOperationException(
- "Unsupported time domain: " + timerKey.getDomain());
- }
- }
-
- @Override
- public Instant currentProcessingTime() {
- return new Instant(timerService.currentProcessingTime());
- }
-
- @Nullable
- @Override
- public Instant currentSynchronizedProcessingTime() {
- return new Instant(timerService.currentProcessingTime());
- }
-
- @Override
- public Instant currentInputWatermarkTime() {
- return new Instant(Math.min(currentInputWatermark, getPushbackWatermarkHold()));
- }
-
- @Nullable
- @Override
- public Instant currentOutputWatermarkTime() {
- return new Instant(currentOutputWatermark);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
deleted file mode 100644
index dce2e68..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import java.nio.ByteBuffer;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-
-/**
- * {@link KeySelector} that retrieves a key from a {@link KV}. This will return
- * the key as encoded by the provided {@link Coder} in a {@link ByteBuffer}. This ensures
- * that all key comparisons/hashing happen on the encoded form.
- */
-public class KvToByteBufferKeySelector<K, V>
- implements KeySelector<WindowedValue<KV<K, V>>, ByteBuffer>,
- ResultTypeQueryable<ByteBuffer> {
-
- private final Coder<K> keyCoder;
-
- public KvToByteBufferKeySelector(Coder<K> keyCoder) {
- this.keyCoder = keyCoder;
- }
-
- @Override
- public ByteBuffer getKey(WindowedValue<KV<K, V>> value) throws Exception {
- K key = value.getValue().getKey();
- byte[] keyBytes = CoderUtils.encodeToByteArray(keyCoder, key);
- return ByteBuffer.wrap(keyBytes);
- }
-
- @Override
- public TypeInformation<ByteBuffer> getProducedType() {
- return new GenericTypeInfo<>(ByteBuffer.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
deleted file mode 100644
index e843660..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import java.util.Collections;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * Singleton keyed word item.
- */
-public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> {
-
- final K key;
- final WindowedValue<ElemT> value;
-
- public SingletonKeyedWorkItem(K key, WindowedValue<ElemT> value) {
- this.key = key;
- this.value = value;
- }
-
- @Override
- public K key() {
- return key;
- }
-
- public WindowedValue<ElemT> value() {
- return value;
- }
-
- @Override
- public Iterable<TimerInternals.TimerData> timersIterable() {
- return Collections.EMPTY_LIST;
- }
-
- @Override
- public Iterable<WindowedValue<ElemT>> elementsIterable() {
- return Collections.singletonList(value);
- }
-}