You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/10/11 14:30:38 UTC

[1/2] flink git commit: [docs] fix typo in Scala functions type descriptors

Repository: flink
Updated Branches:
  refs/heads/master 86080bb97 -> 5dfc897be


[docs] fix typo in Scala functions type descriptors

Closes #1253


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2475c82d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2475c82d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2475c82d

Branch: refs/heads/master
Commit: 2475c82d425b4a6a564f0d38f352a0f3b8753d72
Parents: 86080bb
Author: Viktor Taranenko <vi...@gmail.com>
Authored: Sun Oct 11 11:49:13 2015 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Sun Oct 11 14:26:45 2015 +0200

----------------------------------------------------------------------
 docs/internals/types_serialization.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2475c82d/docs/internals/types_serialization.md
----------------------------------------------------------------------
diff --git a/docs/internals/types_serialization.md b/docs/internals/types_serialization.md
index 19260f1..8a93ccd 100644
--- a/docs/internals/types_serialization.md
+++ b/docs/internals/types_serialization.md
@@ -127,7 +127,7 @@ Another common cause are generic methods, which can be fixed as described in the
 Consider the following case below:
 
 {% highlight scala %}
-def[T] selectFirst(input: DataSet[(T, _)]) : DataSet[T] = {
+def selectFirst[T](input: DataSet[(T, _)]) : DataSet[T] = {
   input.map { v => v._1 }
 }
 
@@ -148,7 +148,7 @@ information will then be generated at the sites where the method is invoked, rat
 method is defined.
 
 {% highlight scala %}
-def[T : TypeInformation] selectFirst(input: DataSet[(T, _)]) : DataSet[T] = {
+def selectFirst[T : TypeInformation](input: DataSet[(T, _)]) : DataSet[T] = {
   input.map { v => v._1 }
 }
 {% endhighlight %}


[2/2] flink git commit: [FLINK-2817] [streaming] FileMonitoring function logs on empty location

Posted by mb...@apache.org.
[FLINK-2817] [streaming] FileMonitoring function logs on empty location

Instead of throwing NPE when location is empty

Closes #1251


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5dfc897b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5dfc897b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5dfc897b

Branch: refs/heads/master
Commit: 5dfc897beb99d2d8f6d7becba972ff5756b3fb19
Parents: 2475c82
Author: ehnalis <zo...@gmail.com>
Authored: Sat Oct 10 17:58:41 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Sun Oct 11 14:29:09 2015 +0200

----------------------------------------------------------------------
 .../source/FileMonitoringFunction.java          | 35 ++++++-----
 .../source/FileMonitoringFunctionTest.java      | 63 ++++++++++++++++++++
 2 files changed, 83 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5dfc897b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
index 2c85650..a217923 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
@@ -17,13 +17,6 @@
 
 package org.apache.flink.streaming.api.functions.source;
 
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
@@ -31,6 +24,13 @@ import org.apache.flink.core.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Long, Long>> {
 	private static final long serialVersionUID = 1L;
 
@@ -95,16 +95,21 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
 
 		FileStatus[] statuses = fileSystem.listStatus(new Path(path));
 
-		for (FileStatus status : statuses) {
-			Path filePath = status.getPath();
-			String fileName = filePath.getName();
-			long modificationTime = status.getModificationTime();
-
-			if (!isFiltered(fileName, modificationTime)) {
-				files.add(filePath.toString());
-				modificationTimes.put(fileName, modificationTime);
+		if (statuses == null) {
+			LOG.warn("Path does not exist: {}", path);
+		} else {
+			for (FileStatus status : statuses) {
+				Path filePath = status.getPath();
+				String fileName = filePath.getName();
+				long modificationTime = status.getModificationTime();
+
+				if (!isFiltered(fileName, modificationTime)) {
+					files.add(filePath.toString());
+					modificationTimes.put(fileName, modificationTime);
+				}
 			}
 		}
+
 		return files;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5dfc897b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
new file mode 100644
index 0000000..2d9921a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *	 http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+/**
+ * Tests for the {@link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction}.
+ */
+public class FileMonitoringFunctionTest {
+
+	@Test
+	public void testForEmptyLocation() throws Exception {
+		final FileMonitoringFunction fileMonitoringFunction
+			= new FileMonitoringFunction("?non-existing-path", 1L, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
+
+        new Thread() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(1000L);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                fileMonitoringFunction.cancel();
+            }
+        }.start();
+
+		fileMonitoringFunction.run(
+            new StreamSource.NonWatermarkContext<Tuple3<String, Long, Long>>(
+                new Object(),
+                new Output<StreamRecord<Tuple3<String, Long, Long>>>() {
+                    @Override
+                    public void emitWatermark(Watermark mark) { }
+                    @Override
+                    public void collect(StreamRecord<Tuple3<String, Long, Long>> record) { }
+                    @Override
+                    public void close() { }
+                })
+        );
+	}
+}
\ No newline at end of file