You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/09/08 08:12:55 UTC

[1/4] incubator-beam git commit: [BEAM-616] Update Flink Runner to Flink 1.1.2

Repository: incubator-beam
Updated Branches:
  refs/heads/master f33296c7f -> fb322cc73


[BEAM-616] Update Flink Runner to Flink 1.1.2


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c66caf33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c66caf33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c66caf33

Branch: refs/heads/master
Commit: c66caf3364ae5c20bdf0fbf41a8ef61d4e53c495
Parents: f33296c
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Sep 5 18:17:11 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 8 10:10:50 2016 +0200

----------------------------------------------------------------------
 runners/flink/pom.xml                           |  2 +-
 runners/flink/runner/pom.xml                    |  8 +++++++
 .../wrappers/streaming/DoFnOperator.java        | 24 ++++++++++++--------
 .../wrappers/streaming/FlinkStateInternals.java |  8 ++++---
 .../wrappers/streaming/WindowDoFnOperator.java  |  4 ++--
 5 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c66caf33/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index b2f3aaa..68e82d2 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -39,7 +39,7 @@
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-    <flink.version>1.0.3</flink.version>
+    <flink.version>1.1.2</flink.version>
   </properties>
 
   <repositories>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c66caf33/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 7c32280..8759591 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -127,6 +127,14 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-runtime_2.10</artifactId>
+      <version>${flink.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
 
     <!-- Beam -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c66caf33/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 3b917e2..79aab9c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -199,7 +199,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       if (restoredSideInputState != null) {
         @SuppressWarnings("unchecked,rawtypes")
         HashMap<String, KvStateSnapshot> castRestored = (HashMap) restoredSideInputState;
-        sideInputStateBackend.injectKeyValueStateSnapshots(castRestored, 0L);
+        sideInputStateBackend.injectKeyValueStateSnapshots(castRestored);
         restoredSideInputState = null;
       }
 
@@ -306,15 +306,19 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
             pushedBackDescriptor);
 
     List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
-    for (WindowedValue<InputT> elem: pushedBack.get()) {
 
-      // we need to set the correct key in case the operator is
-      // a (keyed) window operator
-      setKeyContextElement1(new StreamRecord<>(elem));
+    Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.get();
+    if (pushedBackContents != null) {
+      for (WindowedValue<InputT> elem : pushedBackContents) {
 
-      Iterable<WindowedValue<InputT>> justPushedBack =
-          pushbackDoFnRunner.processElementInReadyWindows(elem);
-      Iterables.addAll(newPushedBack, justPushedBack);
+        // we need to set the correct key in case the operator is
+        // a (keyed) window operator
+        setKeyContextElement1(new StreamRecord<>(elem));
+
+        Iterable<WindowedValue<InputT>> justPushedBack =
+            pushbackDoFnRunner.processElementInReadyWindows(elem);
+        Iterables.addAll(newPushedBack, justPushedBack);
+      }
     }
 
 
@@ -385,8 +389,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
   }
 
   @Override
-  public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
-    super.restoreState(state, recoveryTimestamp);
+  public void restoreState(StreamTaskState state) throws Exception {
+    super.restoreState(state);
 
     @SuppressWarnings("unchecked,rawtypes")
     StateHandle<HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>>> sideInputStateHandle =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c66caf33/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
index 2e10400..27e3dc6 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
@@ -307,10 +307,12 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
     @Override
     public Iterable<T> read() {
       try {
-        return flinkStateBackend.getPartitionedState(
+        Iterable<T> result = flinkStateBackend.getPartitionedState(
             namespace.stringKey(),
             StringSerializer.INSTANCE,
             flinkStateDescriptor).get();
+
+        return result != null ? result : Collections.<T>emptyList();
       } catch (Exception e) {
         throw new RuntimeException("Error reading state.", e);
       }
@@ -326,7 +328,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
                 namespace.stringKey(),
                 StringSerializer.INSTANCE,
                 flinkStateDescriptor).get();
-            return Iterables.isEmpty(result);
+            return result == null;
           } catch (Exception e) {
             throw new RuntimeException("Error reading state.", e);
           }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c66caf33/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 29ae6ae..8b3365d 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -248,8 +248,8 @@ public class WindowDoFnOperator<K, InputT, OutputT>
   }
 
   @Override
-  public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
-    super.restoreState(state, recoveryTimestamp);
+  public void restoreState(StreamTaskState state) throws Exception {
+    super.restoreState(state);
 
     @SuppressWarnings("unchecked")
     StateHandle<DataInputView> operatorState =


[3/4] incubator-beam git commit: Fix shaded imports in Flink Runner

Posted by al...@apache.org.
Fix shaded imports in Flink Runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/27259599
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/27259599
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/27259599

Branch: refs/heads/master
Commit: 2725959996d131b6d189f7f62df3d05e0d361ba0
Parents: 31d09eb
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Sep 5 10:50:14 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 8 10:12:06 2016 +0200

----------------------------------------------------------------------
 .../flink/translation/wrappers/streaming/DoFnOperator.java  | 2 +-
 .../beam/runners/flink/streaming/DoFnOperatorTest.java      | 9 +++++----
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27259599/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 79aab9c..ee23ae4 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
-import avro.shaded.com.google.common.base.Preconditions;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27259599/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 5f1b066..913fb8b 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -20,7 +20,12 @@ package org.apache.beam.runners.flink.streaming;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertThat;
 
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
 import java.util.Collections;
 import java.util.HashMap;
 import javax.annotation.Nullable;
@@ -40,10 +45,6 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.shaded.com.google.common.base.Function;
-import org.apache.flink.shaded.com.google.common.base.Predicate;
-import org.apache.flink.shaded.com.google.common.collect.FluentIterable;
-import org.apache.flink.shaded.com.google.common.collect.ImmutableMap;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;


[2/4] incubator-beam git commit: Merge branch 'flink-1.1.2'

Posted by al...@apache.org.
Merge branch 'flink-1.1.2'

This closes #921


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/31d09eb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/31d09eb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/31d09eb4

Branch: refs/heads/master
Commit: 31d09eb45c4fcd5f88012145e64734785fe4c022
Parents: f33296c c66caf3
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Sep 8 10:11:02 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 8 10:11:02 2016 +0200

----------------------------------------------------------------------
 runners/flink/pom.xml                           |  2 +-
 runners/flink/runner/pom.xml                    |  8 +++++++
 .../wrappers/streaming/DoFnOperator.java        | 24 ++++++++++++--------
 .../wrappers/streaming/FlinkStateInternals.java |  8 ++++---
 .../wrappers/streaming/WindowDoFnOperator.java  |  4 ++--
 5 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------



[4/4] incubator-beam git commit: Merge branch 'flink-fix-imports'

Posted by al...@apache.org.
Merge branch 'flink-fix-imports'

This closes #922


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fb322cc7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fb322cc7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fb322cc7

Branch: refs/heads/master
Commit: fb322cc73aac6ed42b45371751388270d4f0db16
Parents: 31d09eb 2725959
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Sep 8 10:12:15 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 8 10:12:15 2016 +0200

----------------------------------------------------------------------
 .../flink/translation/wrappers/streaming/DoFnOperator.java  | 2 +-
 .../beam/runners/flink/streaming/DoFnOperatorTest.java      | 9 +++++----
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------