You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/11/19 17:14:28 UTC
[1/2] flink git commit: [FLINK-2913] [runtime] Ensure file streams
are properly closes in FsStateBackend
Repository: flink
Updated Branches:
refs/heads/master 864357bac -> 93622001e
[FLINK-2913] [runtime] Ensure file streams are properly closes in FsStateBackend
This closes #1353
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff52d289
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff52d289
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff52d289
Branch: refs/heads/master
Commit: ff52d289113560273830421eceef82028d8bc99c
Parents: 864357b
Author: tedyu <yu...@gmail.com>
Authored: Fri Nov 13 16:43:53 2015 -0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 19 15:27:54 2015 +0100
----------------------------------------------------------------------
.../apache/flink/runtime/state/filesystem/FsStateBackend.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ff52d289/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 045c411..d7b392c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -268,9 +268,9 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
continue;
}
- ObjectOutputStream os = new ObjectOutputStream(outStream);
- os.writeObject(state);
- os.close();
+ try (ObjectOutputStream os = new ObjectOutputStream(outStream)) {
+ os.writeObject(state);
+ }
return new FileSerializableStateHandle<S>(targetPath);
}
[2/2] flink git commit: [FLINK-3048] [tests] Increase stability of
DataSinkTaskTest
Posted by se...@apache.org.
[FLINK-3048] [tests] Increase stability of DataSinkTaskTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93622001
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93622001
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93622001
Branch: refs/heads/master
Commit: 93622001e499fa04bb5c4a63b1b3ed09b270f5b9
Parents: ff52d28
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 19 15:50:46 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 19 15:50:46 2015 +0100
----------------------------------------------------------------------
.../flink/runtime/operators/DataSinkTask.java | 2 +-
.../runtime/operators/DataSinkTaskTest.java | 38 ++++++++++++--------
2 files changed, 25 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/93622001/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index d20bb89..addceea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -257,7 +257,7 @@ public class DataSinkTask<IT> extends AbstractInvokable {
}
}
- BatchTask.clearReaders(new MutableReader[]{inputReader});
+ BatchTask.clearReaders(new MutableReader<?>[]{inputReader});
}
if (!this.taskCanceled) {
http://git-wip-us.apache.org/repos/asf/flink/blob/93622001/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index b741b64..6221706 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -31,12 +31,14 @@ import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
+
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,10 +51,13 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
@RunWith(PowerMockRunner.class)
@PrepareForTest({Task.class, ResultPartitionWriter.class})
-public class DataSinkTaskTest extends TaskTestBase
-{
+public class DataSinkTaskTest extends TaskTestBase {
+
private static final Logger LOG = LoggerFactory.getLogger(DataSinkTaskTest.class);
private static final int MEMORY_MANAGER_SIZE = 3 * 1024 * 1024;
@@ -358,8 +363,7 @@ public class DataSinkTaskTest extends TaskTestBase
}
@Test
- public void testCancelDataSinkTask() {
-
+ public void testCancelDataSinkTask() throws Exception {
super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
super.addInput(new InfiniteInputIterator(), 0);
@@ -382,19 +386,25 @@ public class DataSinkTaskTest extends TaskTestBase
};
taskRunner.start();
- TaskCancelThread tct = new TaskCancelThread(1, taskRunner, testTask);
- tct.start();
-
- try {
- tct.join();
- taskRunner.join();
- } catch(InterruptedException ie) {
- Assert.fail("Joining threads failed");
+ File tempTestFile = new File(this.tempTestPath);
+
+ // wait until the task created the file
+ long deadline = System.currentTimeMillis() + 60000;
+ while (!tempTestFile.exists() && System.currentTimeMillis() < deadline) {
+ Thread.sleep(10);
}
+ assertTrue("Task did not create file within 60 seconds", tempTestFile.exists());
+
+ // cancel the task
+ Thread.sleep(500);
+ testTask.cancel();
+ taskRunner.interrupt();
+
+ // wait for the canceling to complete
+ taskRunner.join();
// assert that temp file was created
- File tempTestFile = new File(this.tempTestPath);
- Assert.assertFalse("Temp output file has not been removed", tempTestFile.exists());
+ assertFalse("Temp output file has not been removed", tempTestFile.exists());
}
@Test