You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/09/08 08:12:55 UTC
[1/4] incubator-beam git commit: [BEAM-616] Update Flink Runner to
Flink 1.1.2
Repository: incubator-beam
Updated Branches:
refs/heads/master f33296c7f -> fb322cc73
[BEAM-616] Update Flink Runner to Flink 1.1.2
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c66caf33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c66caf33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c66caf33
Branch: refs/heads/master
Commit: c66caf3364ae5c20bdf0fbf41a8ef61d4e53c495
Parents: f33296c
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Sep 5 18:17:11 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 8 10:10:50 2016 +0200
----------------------------------------------------------------------
runners/flink/pom.xml | 2 +-
runners/flink/runner/pom.xml | 8 +++++++
.../wrappers/streaming/DoFnOperator.java | 24 ++++++++++++--------
.../wrappers/streaming/FlinkStateInternals.java | 8 ++++---
.../wrappers/streaming/WindowDoFnOperator.java | 4 ++--
5 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c66caf33/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index b2f3aaa..68e82d2 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -39,7 +39,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <flink.version>1.0.3</flink.version>
+ <flink.version>1.1.2</flink.version>
</properties>
<repositories>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c66caf33/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 7c32280..8759591 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -127,6 +127,14 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_2.10</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<!-- Beam -->
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c66caf33/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 3b917e2..79aab9c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -199,7 +199,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
if (restoredSideInputState != null) {
@SuppressWarnings("unchecked,rawtypes")
HashMap<String, KvStateSnapshot> castRestored = (HashMap) restoredSideInputState;
- sideInputStateBackend.injectKeyValueStateSnapshots(castRestored, 0L);
+ sideInputStateBackend.injectKeyValueStateSnapshots(castRestored);
restoredSideInputState = null;
}
@@ -306,15 +306,19 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
pushedBackDescriptor);
List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
- for (WindowedValue<InputT> elem: pushedBack.get()) {
- // we need to set the correct key in case the operator is
- // a (keyed) window operator
- setKeyContextElement1(new StreamRecord<>(elem));
+ Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.get();
+ if (pushedBackContents != null) {
+ for (WindowedValue<InputT> elem : pushedBackContents) {
- Iterable<WindowedValue<InputT>> justPushedBack =
- pushbackDoFnRunner.processElementInReadyWindows(elem);
- Iterables.addAll(newPushedBack, justPushedBack);
+ // we need to set the correct key in case the operator is
+ // a (keyed) window operator
+ setKeyContextElement1(new StreamRecord<>(elem));
+
+ Iterable<WindowedValue<InputT>> justPushedBack =
+ pushbackDoFnRunner.processElementInReadyWindows(elem);
+ Iterables.addAll(newPushedBack, justPushedBack);
+ }
}
@@ -385,8 +389,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
}
@Override
- public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
- super.restoreState(state, recoveryTimestamp);
+ public void restoreState(StreamTaskState state) throws Exception {
+ super.restoreState(state);
@SuppressWarnings("unchecked,rawtypes")
StateHandle<HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>>> sideInputStateHandle =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c66caf33/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
index 2e10400..27e3dc6 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
@@ -17,9 +17,9 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
@@ -307,10 +307,12 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
@Override
public Iterable<T> read() {
try {
- return flinkStateBackend.getPartitionedState(
+ Iterable<T> result = flinkStateBackend.getPartitionedState(
namespace.stringKey(),
StringSerializer.INSTANCE,
flinkStateDescriptor).get();
+
+ return result != null ? result : Collections.<T>emptyList();
} catch (Exception e) {
throw new RuntimeException("Error reading state.", e);
}
@@ -326,7 +328,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
namespace.stringKey(),
StringSerializer.INSTANCE,
flinkStateDescriptor).get();
- return Iterables.isEmpty(result);
+ return result == null;
} catch (Exception e) {
throw new RuntimeException("Error reading state.", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c66caf33/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 29ae6ae..8b3365d 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -248,8 +248,8 @@ public class WindowDoFnOperator<K, InputT, OutputT>
}
@Override
- public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
- super.restoreState(state, recoveryTimestamp);
+ public void restoreState(StreamTaskState state) throws Exception {
+ super.restoreState(state);
@SuppressWarnings("unchecked")
StateHandle<DataInputView> operatorState =
[3/4] incubator-beam git commit: Fix shaded imports in Flink Runner
Posted by al...@apache.org.
Fix shaded imports in Flink Runner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/27259599
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/27259599
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/27259599
Branch: refs/heads/master
Commit: 2725959996d131b6d189f7f62df3d05e0d361ba0
Parents: 31d09eb
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Sep 5 10:50:14 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 8 10:12:06 2016 +0200
----------------------------------------------------------------------
.../flink/translation/wrappers/streaming/DoFnOperator.java | 2 +-
.../beam/runners/flink/streaming/DoFnOperatorTest.java | 9 +++++----
2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27259599/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 79aab9c..ee23ae4 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming;
-import avro.shaded.com.google.common.base.Preconditions;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.io.Serializable;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27259599/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 5f1b066..913fb8b 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -20,7 +20,12 @@ package org.apache.beam.runners.flink.streaming;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
import static org.junit.Assert.assertThat;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
import java.util.Collections;
import java.util.HashMap;
import javax.annotation.Nullable;
@@ -40,10 +45,6 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.shaded.com.google.common.base.Function;
-import org.apache.flink.shaded.com.google.common.base.Predicate;
-import org.apache.flink.shaded.com.google.common.collect.FluentIterable;
-import org.apache.flink.shaded.com.google.common.collect.ImmutableMap;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
[2/4] incubator-beam git commit: Merge branch 'flink-1.1.2'
Posted by al...@apache.org.
Merge branch 'flink-1.1.2'
This closes #921
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/31d09eb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/31d09eb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/31d09eb4
Branch: refs/heads/master
Commit: 31d09eb45c4fcd5f88012145e64734785fe4c022
Parents: f33296c c66caf3
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Sep 8 10:11:02 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 8 10:11:02 2016 +0200
----------------------------------------------------------------------
runners/flink/pom.xml | 2 +-
runners/flink/runner/pom.xml | 8 +++++++
.../wrappers/streaming/DoFnOperator.java | 24 ++++++++++++--------
.../wrappers/streaming/FlinkStateInternals.java | 8 ++++---
.../wrappers/streaming/WindowDoFnOperator.java | 4 ++--
5 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
[4/4] incubator-beam git commit: Merge branch 'flink-fix-imports'
Posted by al...@apache.org.
Merge branch 'flink-fix-imports'
This closes #922
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fb322cc7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fb322cc7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fb322cc7
Branch: refs/heads/master
Commit: fb322cc73aac6ed42b45371751388270d4f0db16
Parents: 31d09eb 2725959
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Sep 8 10:12:15 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 8 10:12:15 2016 +0200
----------------------------------------------------------------------
.../flink/translation/wrappers/streaming/DoFnOperator.java | 2 +-
.../beam/runners/flink/streaming/DoFnOperatorTest.java | 9 +++++----
2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------