You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/10/11 14:30:39 UTC

[2/2] flink git commit: [FLINK-2817] [streaming] FileMonitoring function logs on empty location

[FLINK-2817] [streaming] FileMonitoring function logs on empty location

Instead of throwing NPE when location is empty

Closes #1251


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5dfc897b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5dfc897b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5dfc897b

Branch: refs/heads/master
Commit: 5dfc897beb99d2d8f6d7becba972ff5756b3fb19
Parents: 2475c82
Author: ehnalis <zo...@gmail.com>
Authored: Sat Oct 10 17:58:41 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Sun Oct 11 14:29:09 2015 +0200

----------------------------------------------------------------------
 .../source/FileMonitoringFunction.java          | 35 ++++++-----
 .../source/FileMonitoringFunctionTest.java      | 63 ++++++++++++++++++++
 2 files changed, 83 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5dfc897b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
index 2c85650..a217923 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
@@ -17,13 +17,6 @@
 
 package org.apache.flink.streaming.api.functions.source;
 
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
@@ -31,6 +24,13 @@ import org.apache.flink.core.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Long, Long>> {
 	private static final long serialVersionUID = 1L;
 
@@ -95,16 +95,21 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
 
 		FileStatus[] statuses = fileSystem.listStatus(new Path(path));
 
-		for (FileStatus status : statuses) {
-			Path filePath = status.getPath();
-			String fileName = filePath.getName();
-			long modificationTime = status.getModificationTime();
-
-			if (!isFiltered(fileName, modificationTime)) {
-				files.add(filePath.toString());
-				modificationTimes.put(fileName, modificationTime);
+		if (statuses == null) {
+			LOG.warn("Path does not exist: {}", path);
+		} else {
+			for (FileStatus status : statuses) {
+				Path filePath = status.getPath();
+				String fileName = filePath.getName();
+				long modificationTime = status.getModificationTime();
+
+				if (!isFiltered(fileName, modificationTime)) {
+					files.add(filePath.toString());
+					modificationTimes.put(fileName, modificationTime);
+				}
 			}
 		}
+
 		return files;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5dfc897b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
new file mode 100644
index 0000000..2d9921a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *	 http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+/**
+ * Tests for the {@link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction}.
+ */
+public class FileMonitoringFunctionTest {
+
+	@Test
+	public void testForEmptyLocation() throws Exception {
+		final FileMonitoringFunction fileMonitoringFunction
+			= new FileMonitoringFunction("?non-existing-path", 1L, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
+
+        new Thread() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(1000L);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                fileMonitoringFunction.cancel();
+            }
+        }.start();
+
+		fileMonitoringFunction.run(
+            new StreamSource.NonWatermarkContext<Tuple3<String, Long, Long>>(
+                new Object(),
+                new Output<StreamRecord<Tuple3<String, Long, Long>>>() {
+                    @Override
+                    public void emitWatermark(Watermark mark) { }
+                    @Override
+                    public void collect(StreamRecord<Tuple3<String, Long, Long>> record) { }
+                    @Override
+                    public void close() { }
+                })
+        );
+	}
+}
\ No newline at end of file