You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/11/10 09:38:41 UTC
[1/2] incubator-beam git commit: [BEAM-931] Fix Findbugs Warnings in
Flink Runner
Repository: incubator-beam
Updated Branches:
refs/heads/master ef750c0f8 -> 66faf74d0
[BEAM-931] Fix Findbugs Warnings in Flink Runner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a609a19e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a609a19e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a609a19e
Branch: refs/heads/master
Commit: a609a19e6df763c0aa77d83f05e21ec343f6dcdb
Parents: ef750c0
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Nov 8 11:56:45 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 10 10:29:43 2016 +0100
----------------------------------------------------------------------
.../org/apache/beam/runners/flink/examples/TFIDF.java | 11 +++++++++--
runners/flink/pom.xml | 8 --------
.../java/org/apache/beam/runners/flink/FlinkRunner.java | 4 ++--
.../translation/FlinkStreamingTransformTranslators.java | 2 --
.../wrappers/SerializableFnAggregatorWrapper.java | 7 +++++++
.../wrappers/streaming/WindowDoFnOperator.java | 2 +-
.../wrappers/streaming/io/UnboundedSocketSource.java | 5 +----
.../wrappers/streaming/io/UnboundedSourceWrapper.java | 3 ++-
8 files changed, 22 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index cf5c8f5..b946d98 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -129,7 +129,12 @@ public class TFIDF {
Set<URI> uris = new HashSet<>();
if (absoluteUri.getScheme().equals("file")) {
File directory = new File(absoluteUri);
- for (String entry : directory.list()) {
+ String[] directoryListing = directory.list();
+ if (directoryListing == null) {
+ throw new IOException(
+ "Directory " + absoluteUri + " is not a valid path or IO Error occurred.");
+ }
+ for (String entry : directoryListing) {
File path = new File(directory, entry);
uris.add(path.toURI());
}
@@ -157,7 +162,9 @@ public class TFIDF {
extends PTransform<PBegin, PCollection<KV<URI, String>>> {
private static final long serialVersionUID = 0;
- private Iterable<URI> uris;
+ // transient because PTransform is not really meant to be serialized.
+ // see note on PTransform
+ private final transient Iterable<URI> uris;
public ReadDocuments(Iterable<URI> uris) {
this.uris = uris;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 1b73922..f93af85 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -96,14 +96,6 @@
</executions>
</plugin>
- <!-- BEAM-931 -->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>findbugs-maven-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
</plugins>
</pluginManagement>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 12e21c7..488c170 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -301,7 +301,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
private static class StreamingViewAsMap<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
- private final FlinkRunner runner;
+ private final transient FlinkRunner runner;
@SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
public StreamingViewAsMap(FlinkRunner runner, View.AsMap<K, V> transform) {
@@ -343,7 +343,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
private static class StreamingViewAsMultimap<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
- private final FlinkRunner runner;
+ private final transient FlinkRunner runner;
/**
* Builds an instance of this class from the overridden transform.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 4b819b7..069162f 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -249,7 +249,6 @@ public class FlinkStreamingTransformTranslators {
}});
} else {
try {
- transform.getSource();
UnboundedSourceWrapper<T, ?> sourceWrapper =
new UnboundedSourceWrapper<>(
context.getPipelineOptions(),
@@ -279,7 +278,6 @@ public class FlinkStreamingTransformTranslators {
DataStream<WindowedValue<T>> source;
try {
- transform.getSource();
BoundedSourceWrapper<T> sourceWrapper =
new BoundedSourceWrapper<>(
context.getPipelineOptions(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
index 25d777a..70d97e3 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
@@ -80,6 +80,13 @@ public class SerializableFnAggregatorWrapper<InputT, OutputT>
@Override
public Accumulator<InputT, Serializable> clone() {
+ try {
+ super.clone();
+ } catch (CloneNotSupportedException e) {
+ // Flink Accumulators cannot throw CloneNotSupportedException, work around that.
+ throw new RuntimeException(e);
+ }
+
// copy it by merging
OutputT resultCopy = combiner.apply(Lists.newArrayList((InputT) aa));
SerializableFnAggregatorWrapper<InputT, OutputT> result = new
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 5debd4b..432dc64 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -92,7 +92,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
private transient Multiset<Long> processingTimeTimerTimestamps;
private transient Map<Long, ScheduledFuture<?>> processingTimeTimerFutures;
- private FlinkStateInternals<K> stateInternals;
+ private transient FlinkStateInternals<K> stateInternals;
private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
index 96b5138..ed03dda 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
-import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Collections;
@@ -131,10 +130,8 @@ public class UnboundedSocketSource<CheckpointMarkT extends UnboundedSource.Check
/**
* Unbounded socket reader.
*/
- public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String>
- implements Serializable {
+ public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String> {
- private static final long serialVersionUID = 7526472295622776147L;
private static final Logger LOG = LoggerFactory.getLogger(UnboundedSocketReader.class);
private final UnboundedSocketSource source;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 68a83e8..af955ba 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -240,7 +240,8 @@ public class UnboundedSourceWrapper<
// Flink will interrupt us at some point
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (waitLock) {
- waitLock.wait();
+ // don't wait indefinitely, in case something goes horribly wrong
+ waitLock.wait(1000);
}
} catch (InterruptedException e) {
if (!isRunning) {
[2/2] incubator-beam git commit: This closes #1311
Posted by al...@apache.org.
This closes #1311
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/66faf74d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/66faf74d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/66faf74d
Branch: refs/heads/master
Commit: 66faf74d084e4eba982063c9ca81673b0cfece4b
Parents: ef750c0 a609a19
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Nov 10 10:37:46 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 10 10:37:46 2016 +0100
----------------------------------------------------------------------
.../org/apache/beam/runners/flink/examples/TFIDF.java | 11 +++++++++--
runners/flink/pom.xml | 8 --------
.../java/org/apache/beam/runners/flink/FlinkRunner.java | 4 ++--
.../translation/FlinkStreamingTransformTranslators.java | 2 --
.../wrappers/SerializableFnAggregatorWrapper.java | 7 +++++++
.../wrappers/streaming/WindowDoFnOperator.java | 2 +-
.../wrappers/streaming/io/UnboundedSocketSource.java | 5 +----
.../wrappers/streaming/io/UnboundedSourceWrapper.java | 3 ++-
8 files changed, 22 insertions(+), 20 deletions(-)
----------------------------------------------------------------------