You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/09/16 10:35:24 UTC
[1/4] flink git commit: [FLINK-4626] Add missing break in
MEtricStore#add()
Repository: flink
Updated Branches:
refs/heads/master 7143a2454 -> db90580ff
[FLINK-4626] Add missing break in MEtricStore#add()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1586fc8c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1586fc8c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1586fc8c
Branch: refs/heads/master
Commit: 1586fc8cfc6f41244e46b27ad25dcb6b08b7ce43
Parents: 7143a24
Author: zentol <ch...@apache.org>
Authored: Fri Sep 16 12:31:55 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Sep 16 12:31:55 2016 +0200
----------------------------------------------------------------------
.../org/apache/flink/runtime/webmonitor/metrics/MetricStore.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1586fc8c/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
index 41e68cc..5df63c6 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
@@ -65,6 +65,7 @@ public class MetricStore {
switch (info.getCategory()) {
case INFO_CATEGORY_JM:
addMetric(jobManager.metrics, name, metric);
+ break;
case INFO_CATEGORY_TM:
String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
tm = taskManagers.get(tmID);
[2/4] flink git commit: [FLINK-4612] Close FileWriter using try with
resources
Posted by ch...@apache.org.
[FLINK-4612] Close FileWriter using try with resources
This closes #2492.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f06930bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f06930bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f06930bc
Branch: refs/heads/master
Commit: f06930bcf0d3c2a840cdc5a2e6e5f4b1d03f45d2
Parents: 1586fc8
Author: Alexander Pivovarov <ap...@gmail.com>
Authored: Sun Sep 11 22:46:55 2016 -0700
Committer: zentol <ch...@apache.org>
Committed: Fri Sep 16 12:34:16 2016 +0200
----------------------------------------------------------------------
.../apache/flink/testutils/TestFileUtils.java | 33 +++++-----------
.../relational/util/WebLogDataGenerator.java | 21 ++--------
.../flink/api/java/io/CsvInputFormatTest.java | 6 +--
.../api/java/io/PrimitiveInputFormatTest.java | 8 ++--
.../runtime/io/disk/FileChannelStreamsTest.java | 6 +--
.../runtime/operators/DataSourceTaskTest.java | 40 ++++++++++----------
.../cassandra/CassandraConnectorITCase.java | 23 ++++++-----
.../api/functions/sink/WriteFormatAsCsv.java | 4 +-
.../api/functions/sink/WriteFormatAsText.java | 4 +-
.../clients/examples/LocalExecutorITCase.java | 6 +--
.../aggregators/AggregatorsITCase.java | 6 +--
.../org/apache/flink/yarn/YarnTestBase.java | 8 ++--
12 files changed, 66 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java b/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java
index 683bc4d..ebe68f0 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java
@@ -54,12 +54,9 @@ public class TestFileUtils {
f.getParentFile().mkdirs();
f.createNewFile();
f.deleteOnExit();
-
- BufferedWriter out = new BufferedWriter(new FileWriter(f));
- try {
+
+ try (BufferedWriter out = new BufferedWriter(new FileWriter(f))) {
out.write(contents);
- } finally {
- out.close();
}
return f.toURI().toString();
}
@@ -73,13 +70,10 @@ public class TestFileUtils {
f.createNewFile();
f.deleteOnExit();
- BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(f));
- try {
+ try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(f))) {
for (; bytes > 0; bytes--) {
out.write(0);
}
- } finally {
- out.close();
}
return f.toURI().toString();
}
@@ -87,12 +81,9 @@ public class TestFileUtils {
public static String createTempFile(String contents) throws IOException {
File f = File.createTempFile(FILE_PREFIX, FILE_SUFFIX);
f.deleteOnExit();
-
- BufferedWriter out = new BufferedWriter(new FileWriter(f));
- try {
+
+ try (BufferedWriter out = new BufferedWriter(new FileWriter(f))) {
out.write(contents);
- } finally {
- out.close();
}
return f.toURI().toString();
}
@@ -111,14 +102,11 @@ public class TestFileUtils {
for (long l : bytes) {
File child = new File(f, randomFileName());
child.deleteOnExit();
-
- BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(child));
- try {
+
+ try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(child))) {
for (; l > 0; l--) {
out.write(0);
}
- } finally {
- out.close();
}
}
return f.toURI().toString();
@@ -140,12 +128,9 @@ public class TestFileUtils {
for (String s : contents) {
File child = new File(f, randomFileName(fileExtension));
child.deleteOnExit();
-
- BufferedWriter out = new BufferedWriter(new FileWriter(child));
- try {
+
+ try (BufferedWriter out = new BufferedWriter(new FileWriter(child))) {
out.write(s);
- } finally {
- out.close();
}
}
return f.toURI().toString();
http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java
index 8efa243..e8dbe25 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java
@@ -99,9 +99,7 @@ public class WebLogDataGenerator {
Random rand = new Random(Calendar.getInstance().getTimeInMillis());
- try {
- FileWriter fw = new FileWriter(path);
-
+ try (FileWriter fw = new FileWriter(path)) {
for (int i = 0; i < noDocs; i++) {
int wordsInDoc = rand.nextInt(40) + 10;
@@ -110,8 +108,7 @@ public class WebLogDataGenerator {
for (int j = 0; j < wordsInDoc; j++) {
if (rand.nextDouble() > 0.9) {
// Approx. every 10th word is a keyword
- doc.append(filterKeyWords[rand
- .nextInt(filterKeyWords.length)] + " ");
+ doc.append(filterKeyWords[rand.nextInt(filterKeyWords.length)] + " ");
} else {
// Fills up the docs file(s) with random words
doc.append(words[rand.nextInt(words.length)] + " ");
@@ -121,8 +118,6 @@ public class WebLogDataGenerator {
fw.write(doc.toString());
}
- fw.close();
-
} catch (IOException e) {
e.printStackTrace();
}
@@ -142,9 +137,7 @@ public class WebLogDataGenerator {
Random rand = new Random(Calendar.getInstance().getTimeInMillis());
- try {
- FileWriter fw = new FileWriter(path);
-
+ try (FileWriter fw = new FileWriter(path)) {
for (int i = 0; i < noDocs; i++) {
// Rank
StringBuilder rank = new StringBuilder(rand.nextInt(100) + "|");
@@ -155,8 +148,6 @@ public class WebLogDataGenerator {
fw.write(rank.toString());
}
- fw.close();
-
} catch (IOException e) {
e.printStackTrace();
}
@@ -178,9 +169,7 @@ public class WebLogDataGenerator {
Random rand = new Random(Calendar.getInstance().getTimeInMillis());
- try {
- FileWriter fw = new FileWriter(path);
-
+ try (FileWriter fw = new FileWriter(path)) {
for (int i = 0; i < noVisits; i++) {
int year = 2000 + rand.nextInt(10); // yearFilter 3
@@ -200,8 +189,6 @@ public class WebLogDataGenerator {
fw.write(visit.toString());
}
- fw.close();
-
} catch (IOException e) {
e.printStackTrace();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index ecf55c3..fa091d9 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -78,9 +78,9 @@ public class CsvInputFormatTest {
final File tempFile = File.createTempFile("input-stream-decoration-test", "tmp");
tempFile.deleteOnExit();
- FileOutputStream fileOutputStream = new FileOutputStream(tempFile);
- fileOutputStream.write(fileContent.getBytes());
- fileOutputStream.close();
+ try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
+ fileOutputStream.write(fileContent.getBytes());
+ }
// fix the number of blocks and the size of each one.
final int noOfBlocks = 3;
http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
index 4a92702..f9dc28a 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
@@ -179,11 +179,11 @@ public class PrimitiveInputFormatTest {
File tempFile = File.createTempFile("test_contents", "tmp");
tempFile.deleteOnExit();
- FileWriter wrt = new FileWriter(tempFile);
- wrt.write(content);
- wrt.close();
+ try (FileWriter wrt = new FileWriter(tempFile)) {
+ wrt.write(content);
+ }
return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"});
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
index 1c2b3de..1044a35 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
@@ -85,10 +85,8 @@ public class FileChannelStreamsTest {
FileIOChannel.ID channel = ioManager.createChannel();
// add some test data
- {
- FileWriter wrt = new FileWriter(channel.getPath());
+ try (FileWriter wrt = new FileWriter(channel.getPath())) {
wrt.write("test data");
- wrt.close();
}
BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(channel);
@@ -116,4 +114,4 @@ public class FileChannelStreamsTest {
ioManager.shutdown();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
index 95f2991..0388c2b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
@@ -221,28 +221,26 @@ public class DataSourceTaskTest extends TaskTestBase {
private static class InputFilePreparator {
public static void prepareInputFile(MutableObjectIterator<Record> inIt, String inputFilePath, boolean insertInvalidData)
- throws IOException
- {
- FileWriter fw = new FileWriter(inputFilePath);
- BufferedWriter bw = new BufferedWriter(fw);
-
- if (insertInvalidData) {
- bw.write("####_I_AM_INVALID_########\n");
- }
-
- Record rec = new Record();
- while ((rec = inIt.next(rec)) != null) {
- IntValue key = rec.getField(0, IntValue.class);
- IntValue value = rec.getField(1, IntValue.class);
-
- bw.write(key.getValue() + "_" + value.getValue() + "\n");
- }
- if (insertInvalidData) {
- bw.write("####_I_AM_INVALID_########\n");
+ throws IOException {
+
+ try (BufferedWriter bw = new BufferedWriter(new FileWriter(inputFilePath))) {
+ if (insertInvalidData) {
+ bw.write("####_I_AM_INVALID_########\n");
+ }
+
+ Record rec = new Record();
+ while ((rec = inIt.next(rec)) != null) {
+ IntValue key = rec.getField(0, IntValue.class);
+ IntValue value = rec.getField(1, IntValue.class);
+
+ bw.write(key.getValue() + "_" + value.getValue() + "\n");
+ }
+ if (insertInvalidData) {
+ bw.write("####_I_AM_INVALID_########\n");
+ }
+
+ bw.flush();
}
-
- bw.flush();
- bw.close();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index f94ff68..cc4a527 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -166,17 +166,20 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml");
assertTrue(tmp.createNewFile());
- BufferedWriter b = new BufferedWriter(new FileWriter(tmp));
-
- //copy cassandra.yaml; inject absolute paths into cassandra.yaml
- Scanner scanner = new Scanner(file);
- while (scanner.hasNextLine()) {
- String line = scanner.nextLine();
- line = line.replace("$PATH", "'" + tmp.getParentFile());
- b.write(line + "\n");
- b.flush();
+
+ try (
+ BufferedWriter b = new BufferedWriter(new FileWriter(tmp));
+
+ //copy cassandra.yaml; inject absolute paths into cassandra.yaml
+ Scanner scanner = new Scanner(file);
+ ) {
+ while (scanner.hasNextLine()) {
+ String line = scanner.nextLine();
+ line = line.replace("$PATH", "'" + tmp.getParentFile());
+ b.write(line + "\n");
+ b.flush();
+ }
}
- scanner.close();
// Tell cassandra where the configuration files are.
http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
index 22f304a..da03859 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
@@ -37,13 +37,11 @@ public class WriteFormatAsCsv<IN> extends WriteFormat<IN> {
@Override
protected void write(String path, ArrayList<IN> tupleList) {
- try {
- PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
+ try (PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)))) {
for (IN tupleToWrite : tupleList) {
String strTuple = tupleToWrite.toString();
outStream.println(strTuple.substring(1, strTuple.length() - 1));
}
- outStream.close();
} catch (IOException e) {
throw new RuntimeException("Exception occured while writing file " + path, e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
index c36bc9e..e331ed9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
@@ -37,12 +37,10 @@ public class WriteFormatAsText<IN> extends WriteFormat<IN> {
@Override
public void write(String path, ArrayList<IN> tupleList) {
- try {
- PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
+ try (PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)))) {
for (IN tupleToWrite : tupleList) {
outStream.println(tupleToWrite);
}
- outStream.close();
} catch (IOException e) {
throw new RuntimeException("Exception occured while writing file " + path, e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
index 5b05b67..4ed28a8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
@@ -46,9 +46,9 @@ public class LocalExecutorITCase {
inFile.deleteOnExit();
outFile.deleteOnExit();
- FileWriter fw = new FileWriter(inFile);
- fw.write(WordCountData.TEXT);
- fw.close();
+ try (FileWriter fw = new FileWriter(inFile)) {
+ fw.write(WordCountData.TEXT);
+ }
LocalExecutor executor = new LocalExecutor();
executor.setDefaultOverwriteFiles(true);
http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
index 8b98b29..4c5e955 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
@@ -91,9 +91,9 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
@Test
public void testDistributedCacheWithIterations() throws Exception{
File tempFile = new File(testPath);
- FileWriter writer = new FileWriter(tempFile);
- writer.write(testString);
- writer.close();
+ try (FileWriter writer = new FileWriter(tempFile)) {
+ writer.write(testString);
+ }
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerCachedFile(resultPath, testName);
http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 31a3d98..6270010 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -246,10 +246,10 @@ public abstract class YarnTestBase extends TestLogger {
tmp.create();
File yarnSiteXML = new File(tmp.newFolder().getAbsolutePath() + "/yarn-site.xml");
- FileWriter writer = new FileWriter(yarnSiteXML);
- yarnConf.writeXml(writer);
- writer.flush();
- writer.close();
+ try (FileWriter writer = new FileWriter(yarnSiteXML)) {
+ yarnConf.writeXml(writer);
+ writer.flush();
+ }
return yarnSiteXML;
}
[4/4] flink git commit: [FLINK-4607] Close FileInputStream in
ParameterTool and other
Posted by ch...@apache.org.
[FLINK-4607] Close FileInputStream in ParameterTool and other
This closes #2488.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db90580f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db90580f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db90580f
Branch: refs/heads/master
Commit: db90580ffdb93a55a6318b18b5d50ec3666b001b
Parents: 9046374
Author: Alexander Pivovarov <ap...@gmail.com>
Authored: Fri Sep 9 20:32:28 2016 -0700
Committer: zentol <ch...@apache.org>
Committed: Fri Sep 16 12:34:45 2016 +0200
----------------------------------------------------------------------
.../apache/flink/core/fs/local/LocalFileSystemTest.java | 6 +++---
.../org/apache/flink/api/java/utils/ParameterTool.java | 6 +++---
.../apache/flink/api/java/utils/ParameterToolTest.java | 4 +++-
.../flink/runtime/util/JarFileCreatorLambdaTest.java | 12 ++++++------
.../apache/flink/runtime/util/JarFileCreatorTest.java | 12 ++++++------
5 files changed, 21 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/db90580f/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
index 299524d..d21e0f1 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
@@ -116,10 +116,10 @@ public class LocalFileSystemTest {
assertEquals(testfile1.length(), 5L);
- final FileInputStream fisfile1 = new FileInputStream(testfile1);
byte[] testbytestest = new byte[5];
- assertEquals(testbytestest.length, fisfile1.read(testbytestest));
- fisfile1.close();
+ try (FileInputStream fisfile1 = new FileInputStream(testfile1)) {
+ assertEquals(testbytestest.length, fisfile1.read(testbytestest));
+ }
assertArrayEquals(testbytes, testbytestest);
http://git-wip-us.apache.org/repos/asf/flink/blob/db90580f/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
index 38ae6df..a9389a5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
@@ -156,9 +156,9 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
throw new FileNotFoundException("Properties file " + propertiesFile.getAbsolutePath() + " does not exist");
}
Properties props = new Properties();
- FileInputStream fis = new FileInputStream(propertiesFile);
- props.load(fis);
- fis.close();
+ try (FileInputStream fis = new FileInputStream(propertiesFile)) {
+ props.load(fis);
+ }
return fromMap((Map)props);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db90580f/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
index 09a7781..605f033 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
@@ -185,7 +185,9 @@ public class ParameterToolTest {
String pathToFile = tmp.newFile().getAbsolutePath();
parameter.createPropertiesFile(pathToFile);
Properties defaultProps = new Properties();
- defaultProps.load(new FileInputStream(pathToFile));
+ try (FileInputStream fis = new FileInputStream(pathToFile)) {
+ defaultProps.load(fis);
+ }
Assert.assertEquals("myDefaultValue", defaultProps.get("output"));
Assert.assertEquals("-1", defaultProps.get("expectedCount"));
http://git-wip-us.apache.org/repos/asf/flink/blob/db90580f/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
index 249e082..d90f096 100644
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
@@ -98,13 +98,13 @@ public class JarFileCreatorLambdaTest {
}
public boolean validate(Set<String> expected, File out) throws Exception {
-
- JarInputStream jis = new JarInputStream(new FileInputStream(out));
- ZipEntry ze;
int count = expected.size();
- while ((ze = jis.getNextEntry()) != null) {
- count--;
- expected.remove(ze.getName());
+ try (JarInputStream jis = new JarInputStream(new FileInputStream(out))) {
+ ZipEntry ze;
+ while ((ze = jis.getNextEntry()) != null) {
+ count--;
+ expected.remove(ze.getName());
+ }
}
return count == 0 && expected.size() == 0;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db90580f/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
index ba207ec..8f8016e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
@@ -205,13 +205,13 @@ public class JarFileCreatorTest {
}
private boolean validate(Set<String> expected, File out) throws IOException {
-
- JarInputStream jis = new JarInputStream(new FileInputStream(out));
- ZipEntry ze;
int count = expected.size();
- while ((ze = jis.getNextEntry()) != null) {
- count--;
- expected.remove(ze.getName());
+ try (JarInputStream jis = new JarInputStream(new FileInputStream(out))) {
+ ZipEntry ze;
+ while ((ze = jis.getNextEntry()) != null) {
+ count--;
+ expected.remove(ze.getName());
+ }
}
return count == 0 && expected.size() == 0;
}
[3/4] flink git commit: [FLINK-4608] Use short-circuit AND in Max/Min
AggregationFunction
Posted by ch...@apache.org.
[FLINK-4608] Use short-circuit AND in Max/Min AggregationFunction
This closes #2489.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9046374c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9046374c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9046374c
Branch: refs/heads/master
Commit: 9046374c6b5b35589d90416725c1e8eb09843bea
Parents: f06930b
Author: Alexander Pivovarov <ap...@gmail.com>
Authored: Fri Sep 9 21:52:36 2016 -0700
Committer: zentol <ch...@apache.org>
Committed: Fri Sep 16 12:34:34 2016 +0200
----------------------------------------------------------------------
.../apache/flink/api/java/aggregation/MaxAggregationFunction.java | 2 +-
.../apache/flink/api/java/aggregation/MinAggregationFunction.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9046374c/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
index d1edbe2..69715c3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
@@ -98,7 +98,7 @@ public abstract class MaxAggregationFunction<T extends Comparable<T>> extends Ag
@Override
public <T> AggregationFunction<T> createAggregationFunction(Class<T> type) {
if (Comparable.class.isAssignableFrom(type)) {
- if (ResettableValue.class.isAssignableFrom(type) & CopyableValue.class.isAssignableFrom(type)) {
+ if (ResettableValue.class.isAssignableFrom(type) && CopyableValue.class.isAssignableFrom(type)) {
return (AggregationFunction<T>) new MutableMaxAgg();
} else {
return (AggregationFunction<T>) new ImmutableMaxAgg();
http://git-wip-us.apache.org/repos/asf/flink/blob/9046374c/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
index a4a9e0e..c7ebcc5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
@@ -98,7 +98,7 @@ public abstract class MinAggregationFunction<T extends Comparable<T>> extends Ag
@Override
public <T> AggregationFunction<T> createAggregationFunction(Class<T> type) {
if (Comparable.class.isAssignableFrom(type)) {
- if (ResettableValue.class.isAssignableFrom(type) & CopyableValue.class.isAssignableFrom(type)) {
+ if (ResettableValue.class.isAssignableFrom(type) && CopyableValue.class.isAssignableFrom(type)) {
return (AggregationFunction<T>) new MutableMinAgg();
} else {
return (AggregationFunction<T>) new ImmutableMinAgg();