You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/11/10 09:38:41 UTC

[1/2] incubator-beam git commit: [BEAM-931] Fix Findbugs Warnings in Flink Runner

Repository: incubator-beam
Updated Branches:
  refs/heads/master ef750c0f8 -> 66faf74d0


[BEAM-931] Fix Findbugs Warnings in Flink Runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a609a19e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a609a19e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a609a19e

Branch: refs/heads/master
Commit: a609a19e6df763c0aa77d83f05e21ec343f6dcdb
Parents: ef750c0
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Nov 8 11:56:45 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 10 10:29:43 2016 +0100

----------------------------------------------------------------------
 .../org/apache/beam/runners/flink/examples/TFIDF.java    | 11 +++++++++--
 runners/flink/pom.xml                                    |  8 --------
 .../java/org/apache/beam/runners/flink/FlinkRunner.java  |  4 ++--
 .../translation/FlinkStreamingTransformTranslators.java  |  2 --
 .../wrappers/SerializableFnAggregatorWrapper.java        |  7 +++++++
 .../wrappers/streaming/WindowDoFnOperator.java           |  2 +-
 .../wrappers/streaming/io/UnboundedSocketSource.java     |  5 +----
 .../wrappers/streaming/io/UnboundedSourceWrapper.java    |  3 ++-
 8 files changed, 22 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index cf5c8f5..b946d98 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -129,7 +129,12 @@ public class TFIDF {
     Set<URI> uris = new HashSet<>();
     if (absoluteUri.getScheme().equals("file")) {
       File directory = new File(absoluteUri);
-      for (String entry : directory.list()) {
+      String[] directoryListing = directory.list();
+      if (directoryListing == null) {
+        throw new IOException(
+            "Directory " + absoluteUri + " is not a valid path or IO Error occurred.");
+      }
+      for (String entry : directoryListing) {
         File path = new File(directory, entry);
         uris.add(path.toURI());
       }
@@ -157,7 +162,9 @@ public class TFIDF {
       extends PTransform<PBegin, PCollection<KV<URI, String>>> {
     private static final long serialVersionUID = 0;
 
-    private Iterable<URI> uris;
+    // transient because PTransform is not really meant to be serialized.
+    // see note on PTransform
+    private final transient Iterable<URI> uris;
 
     public ReadDocuments(Iterable<URI> uris) {
       this.uris = uris;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 1b73922..f93af85 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -96,14 +96,6 @@
           </executions>
         </plugin>
 
-        <!-- BEAM-931 -->
-        <plugin>
-          <groupId>org.codehaus.mojo</groupId>
-          <artifactId>findbugs-maven-plugin</artifactId>
-          <configuration>
-            <skip>true</skip>
-          </configuration>
-        </plugin>
       </plugins>
     </pluginManagement>
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 12e21c7..488c170 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -301,7 +301,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
   private static class StreamingViewAsMap<K, V>
       extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
 
-    private final FlinkRunner runner;
+    private final transient FlinkRunner runner;
 
     @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
     public StreamingViewAsMap(FlinkRunner runner, View.AsMap<K, V> transform) {
@@ -343,7 +343,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
   private static class StreamingViewAsMultimap<K, V>
       extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
 
-    private final FlinkRunner runner;
+    private final transient FlinkRunner runner;
 
     /**
      * Builds an instance of this class from the overridden transform.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 4b819b7..069162f 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -249,7 +249,6 @@ public class FlinkStreamingTransformTranslators {
               }});
       } else {
         try {
-          transform.getSource();
           UnboundedSourceWrapper<T, ?> sourceWrapper =
               new UnboundedSourceWrapper<>(
                   context.getPipelineOptions(),
@@ -279,7 +278,6 @@ public class FlinkStreamingTransformTranslators {
 
       DataStream<WindowedValue<T>> source;
       try {
-        transform.getSource();
         BoundedSourceWrapper<T> sourceWrapper =
             new BoundedSourceWrapper<>(
                 context.getPipelineOptions(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
index 25d777a..70d97e3 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
@@ -80,6 +80,13 @@ public class SerializableFnAggregatorWrapper<InputT, OutputT>
 
   @Override
   public Accumulator<InputT, Serializable> clone() {
+    try {
+      super.clone();
+    } catch (CloneNotSupportedException e) {
+      // Flink Accumulators cannot throw CloneNotSupportedException, work around that.
+      throw new RuntimeException(e);
+    }
+
     // copy it by merging
     OutputT resultCopy = combiner.apply(Lists.newArrayList((InputT) aa));
     SerializableFnAggregatorWrapper<InputT, OutputT> result = new

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 5debd4b..432dc64 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -92,7 +92,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
   private transient Multiset<Long> processingTimeTimerTimestamps;
   private transient Map<Long, ScheduledFuture<?>> processingTimeTimerFutures;
 
-  private FlinkStateInternals<K> stateInternals;
+  private transient FlinkStateInternals<K> stateInternals;
 
   private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
index 96b5138..ed03dda 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkArgument;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.io.Serializable;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.Collections;
@@ -131,10 +130,8 @@ public class UnboundedSocketSource<CheckpointMarkT extends UnboundedSource.Check
   /**
    * Unbounded socket reader.
    */
-  public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String>
-      implements Serializable {
+  public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String> {
 
-    private static final long serialVersionUID = 7526472295622776147L;
     private static final Logger LOG = LoggerFactory.getLogger(UnboundedSocketReader.class);
 
     private final UnboundedSocketSource source;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 68a83e8..af955ba 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -240,7 +240,8 @@ public class UnboundedSourceWrapper<
           // Flink will interrupt us at some point
           //noinspection SynchronizationOnLocalVariableOrMethodParameter
           synchronized (waitLock) {
-            waitLock.wait();
+            // don't wait indefinitely, in case something goes horribly wrong
+            waitLock.wait(1000);
           }
         } catch (InterruptedException e) {
           if (!isRunning) {


[2/2] incubator-beam git commit: This closes #1311

Posted by al...@apache.org.
This closes #1311


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/66faf74d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/66faf74d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/66faf74d

Branch: refs/heads/master
Commit: 66faf74d084e4eba982063c9ca81673b0cfece4b
Parents: ef750c0 a609a19
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Nov 10 10:37:46 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 10 10:37:46 2016 +0100

----------------------------------------------------------------------
 .../org/apache/beam/runners/flink/examples/TFIDF.java    | 11 +++++++++--
 runners/flink/pom.xml                                    |  8 --------
 .../java/org/apache/beam/runners/flink/FlinkRunner.java  |  4 ++--
 .../translation/FlinkStreamingTransformTranslators.java  |  2 --
 .../wrappers/SerializableFnAggregatorWrapper.java        |  7 +++++++
 .../wrappers/streaming/WindowDoFnOperator.java           |  2 +-
 .../wrappers/streaming/io/UnboundedSocketSource.java     |  5 +----
 .../wrappers/streaming/io/UnboundedSourceWrapper.java    |  3 ++-
 8 files changed, 22 insertions(+), 20 deletions(-)
----------------------------------------------------------------------