You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/11/19 17:14:28 UTC

[1/2] flink git commit: [FLINK-2913] [runtime] Ensure file streams are properly closes in FsStateBackend

Repository: flink
Updated Branches:
  refs/heads/master 864357bac -> 93622001e


[FLINK-2913] [runtime] Ensure file streams are properly closes in FsStateBackend

This closes #1353


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff52d289
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff52d289
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff52d289

Branch: refs/heads/master
Commit: ff52d289113560273830421eceef82028d8bc99c
Parents: 864357b
Author: tedyu <yu...@gmail.com>
Authored: Fri Nov 13 16:43:53 2015 -0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 19 15:27:54 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/state/filesystem/FsStateBackend.java  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ff52d289/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 045c411..d7b392c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -268,9 +268,9 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
 				continue;
 			}
 
-			ObjectOutputStream os = new ObjectOutputStream(outStream);
-			os.writeObject(state);
-			os.close();
+			try (ObjectOutputStream os = new ObjectOutputStream(outStream)) {
+				os.writeObject(state);
+			}
 			return new FileSerializableStateHandle<S>(targetPath);
 		}
 		


[2/2] flink git commit: [FLINK-3048] [tests] Increase stability of DataSinkTaskTest

Posted by se...@apache.org.
[FLINK-3048] [tests] Increase stability of DataSinkTaskTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93622001
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93622001
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93622001

Branch: refs/heads/master
Commit: 93622001e499fa04bb5c4a63b1b3ed09b270f5b9
Parents: ff52d28
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 19 15:50:46 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 19 15:50:46 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/operators/DataSinkTask.java   |  2 +-
 .../runtime/operators/DataSinkTaskTest.java     | 38 ++++++++++++--------
 2 files changed, 25 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/93622001/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index d20bb89..addceea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -257,7 +257,7 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 				}
 			}
 
-			BatchTask.clearReaders(new MutableReader[]{inputReader});
+			BatchTask.clearReaders(new MutableReader<?>[]{inputReader});
 		}
 
 		if (!this.taskCanceled) {

http://git-wip-us.apache.org/repos/asf/flink/blob/93622001/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index b741b64..6221706 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -31,12 +31,14 @@ import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,10 +51,13 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({Task.class, ResultPartitionWriter.class})
-public class DataSinkTaskTest extends TaskTestBase
-{
+public class DataSinkTaskTest extends TaskTestBase {
+	
 	private static final Logger LOG = LoggerFactory.getLogger(DataSinkTaskTest.class);
 
 	private static final int MEMORY_MANAGER_SIZE = 3 * 1024 * 1024;
@@ -358,8 +363,7 @@ public class DataSinkTaskTest extends TaskTestBase
 	}
 
 	@Test
-	public void testCancelDataSinkTask() {
-
+	public void testCancelDataSinkTask() throws Exception {
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new InfiniteInputIterator(), 0);
 
@@ -382,19 +386,25 @@ public class DataSinkTaskTest extends TaskTestBase
 		};
 		taskRunner.start();
 
-		TaskCancelThread tct = new TaskCancelThread(1, taskRunner, testTask);
-		tct.start();
-
-		try {
-			tct.join();
-			taskRunner.join();
-		} catch(InterruptedException ie) {
-			Assert.fail("Joining threads failed");
+		File tempTestFile = new File(this.tempTestPath);
+		
+		// wait until the task created the file
+		long deadline = System.currentTimeMillis() + 60000;
+		while (!tempTestFile.exists() && System.currentTimeMillis() < deadline) {
+			Thread.sleep(10);
 		}
+		assertTrue("Task did not create file within 60 seconds", tempTestFile.exists());
+		
+		// cancel the task
+		Thread.sleep(500);
+		testTask.cancel();
+		taskRunner.interrupt();
+		
+		// wait for the canceling to complete
+		taskRunner.join();
 
 		// assert that temp file was created
-		File tempTestFile = new File(this.tempTestPath);
-		Assert.assertFalse("Temp output file has not been removed", tempTestFile.exists());
+		assertFalse("Temp output file has not been removed", tempTestFile.exists());
 	}
 
 	@Test