You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/09/16 10:35:24 UTC

[1/4] flink git commit: [FLINK-4626] Add missing break in MEtricStore#add()

Repository: flink
Updated Branches:
  refs/heads/master 7143a2454 -> db90580ff


[FLINK-4626] Add missing break in MEtricStore#add()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1586fc8c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1586fc8c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1586fc8c

Branch: refs/heads/master
Commit: 1586fc8cfc6f41244e46b27ad25dcb6b08b7ce43
Parents: 7143a24
Author: zentol <ch...@apache.org>
Authored: Fri Sep 16 12:31:55 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Sep 16 12:31:55 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/webmonitor/metrics/MetricStore.java    | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1586fc8c/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
index 41e68cc..5df63c6 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
@@ -65,6 +65,7 @@ public class MetricStore {
 			switch (info.getCategory()) {
 				case INFO_CATEGORY_JM:
 					addMetric(jobManager.metrics, name, metric);
+					break;
 				case INFO_CATEGORY_TM:
 					String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
 					tm = taskManagers.get(tmID);


[2/4] flink git commit: [FLINK-4612] Close FileWriter using try with resources

Posted by ch...@apache.org.
[FLINK-4612] Close FileWriter using try with resources

This closes #2492.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f06930bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f06930bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f06930bc

Branch: refs/heads/master
Commit: f06930bcf0d3c2a840cdc5a2e6e5f4b1d03f45d2
Parents: 1586fc8
Author: Alexander Pivovarov <ap...@gmail.com>
Authored: Sun Sep 11 22:46:55 2016 -0700
Committer: zentol <ch...@apache.org>
Committed: Fri Sep 16 12:34:16 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/testutils/TestFileUtils.java   | 33 +++++-----------
 .../relational/util/WebLogDataGenerator.java    | 21 ++--------
 .../flink/api/java/io/CsvInputFormatTest.java   |  6 +--
 .../api/java/io/PrimitiveInputFormatTest.java   |  8 ++--
 .../runtime/io/disk/FileChannelStreamsTest.java |  6 +--
 .../runtime/operators/DataSourceTaskTest.java   | 40 ++++++++++----------
 .../cassandra/CassandraConnectorITCase.java     | 23 ++++++-----
 .../api/functions/sink/WriteFormatAsCsv.java    |  4 +-
 .../api/functions/sink/WriteFormatAsText.java   |  4 +-
 .../clients/examples/LocalExecutorITCase.java   |  6 +--
 .../aggregators/AggregatorsITCase.java          |  6 +--
 .../org/apache/flink/yarn/YarnTestBase.java     |  8 ++--
 12 files changed, 66 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java b/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java
index 683bc4d..ebe68f0 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java
@@ -54,12 +54,9 @@ public class TestFileUtils {
 		f.getParentFile().mkdirs();
 		f.createNewFile();
 		f.deleteOnExit();
-		
-		BufferedWriter out = new BufferedWriter(new FileWriter(f));
-		try { 
+
+		try (BufferedWriter out = new BufferedWriter(new FileWriter(f))) {
 			out.write(contents);
-		} finally {
-			out.close();
 		}
 		return f.toURI().toString();
 	}
@@ -73,13 +70,10 @@ public class TestFileUtils {
 		f.createNewFile();
 		f.deleteOnExit();
 
-		BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(f));
-		try {
+		try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(f))) {
 			for (; bytes > 0; bytes--) {
 				out.write(0);
 			}
-		} finally {
-			out.close();
 		}
 		return f.toURI().toString();
 	}
@@ -87,12 +81,9 @@ public class TestFileUtils {
 	public static String createTempFile(String contents) throws IOException {
 		File f = File.createTempFile(FILE_PREFIX, FILE_SUFFIX);
 		f.deleteOnExit();
-		
-		BufferedWriter out = new BufferedWriter(new FileWriter(f));
-		try { 
+
+		try (BufferedWriter out = new BufferedWriter(new FileWriter(f))) {
 			out.write(contents);
-		} finally {
-			out.close();
 		}
 		return f.toURI().toString();
 	}
@@ -111,14 +102,11 @@ public class TestFileUtils {
 		for (long l : bytes) {
 			File child = new File(f, randomFileName());
 			child.deleteOnExit();
-		
-			BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(child));
-			try { 
+
+			try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(child))) {
 				for (; l > 0; l--) {
 					out.write(0);
 				}
-			} finally {
-				out.close();
 			}
 		}
 		return f.toURI().toString();
@@ -140,12 +128,9 @@ public class TestFileUtils {
 		for (String s : contents) {
 			File child = new File(f, randomFileName(fileExtension));
 			child.deleteOnExit();
-		
-			BufferedWriter out = new BufferedWriter(new FileWriter(child));
-			try { 
+
+			try (BufferedWriter out = new BufferedWriter(new FileWriter(child))) {
 				out.write(s);
-			} finally {
-				out.close();
 			}
 		}
 		return f.toURI().toString();

http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java
index 8efa243..e8dbe25 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java
@@ -99,9 +99,7 @@ public class WebLogDataGenerator {
 
 		Random rand = new Random(Calendar.getInstance().getTimeInMillis());
 
-		try {
-			FileWriter fw = new FileWriter(path);
-
+		try (FileWriter fw = new FileWriter(path)) {
 			for (int i = 0; i < noDocs; i++) {
 
 				int wordsInDoc = rand.nextInt(40) + 10;
@@ -110,8 +108,7 @@ public class WebLogDataGenerator {
 				for (int j = 0; j < wordsInDoc; j++) {
 					if (rand.nextDouble() > 0.9) {
 						// Approx. every 10th word is a keyword
-						doc.append(filterKeyWords[rand
-								.nextInt(filterKeyWords.length)] + " ");
+						doc.append(filterKeyWords[rand.nextInt(filterKeyWords.length)] + " ");
 					} else {
 						// Fills up the docs file(s) with random words
 						doc.append(words[rand.nextInt(words.length)] + " ");
@@ -121,8 +118,6 @@ public class WebLogDataGenerator {
 
 				fw.write(doc.toString());
 			}
-			fw.close();
-
 		} catch (IOException e) {
 			e.printStackTrace();
 		}
@@ -142,9 +137,7 @@ public class WebLogDataGenerator {
 
 		Random rand = new Random(Calendar.getInstance().getTimeInMillis());
 
-		try {
-			FileWriter fw = new FileWriter(path);
-
+		try (FileWriter fw = new FileWriter(path)) {
 			for (int i = 0; i < noDocs; i++) {
 				// Rank
 				StringBuilder rank = new StringBuilder(rand.nextInt(100) + "|");
@@ -155,8 +148,6 @@ public class WebLogDataGenerator {
 
 				fw.write(rank.toString());
 			}
-			fw.close();
-
 		} catch (IOException e) {
 			e.printStackTrace();
 		}
@@ -178,9 +169,7 @@ public class WebLogDataGenerator {
 
 		Random rand = new Random(Calendar.getInstance().getTimeInMillis());
 
-		try {
-			FileWriter fw = new FileWriter(path);
-
+		try (FileWriter fw = new FileWriter(path)) {
 			for (int i = 0; i < noVisits; i++) {
 
 				int year = 2000 + rand.nextInt(10); // yearFilter 3
@@ -200,8 +189,6 @@ public class WebLogDataGenerator {
 
 				fw.write(visit.toString());
 			}
-			fw.close();
-
 		} catch (IOException e) {
 			e.printStackTrace();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index ecf55c3..fa091d9 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -78,9 +78,9 @@ public class CsvInputFormatTest {
 		final File tempFile = File.createTempFile("input-stream-decoration-test", "tmp");
 		tempFile.deleteOnExit();
 
-		FileOutputStream fileOutputStream = new FileOutputStream(tempFile);
-		fileOutputStream.write(fileContent.getBytes());
-		fileOutputStream.close();
+		try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
+			fileOutputStream.write(fileContent.getBytes());
+		}
 
 		// fix the number of blocks and the size of each one.
 		final int noOfBlocks = 3;

http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
index 4a92702..f9dc28a 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
@@ -179,11 +179,11 @@ public class PrimitiveInputFormatTest {
 		File tempFile = File.createTempFile("test_contents", "tmp");
 		tempFile.deleteOnExit();
 
-		FileWriter wrt = new FileWriter(tempFile);
-		wrt.write(content);
-		wrt.close();
+		try (FileWriter wrt = new FileWriter(tempFile)) {
+			wrt.write(content);
+		}
 
 		return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"});
 	}
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
index 1c2b3de..1044a35 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
@@ -85,10 +85,8 @@ public class FileChannelStreamsTest {
 			FileIOChannel.ID channel = ioManager.createChannel();
 			
 			// add some test data
-			{
-				FileWriter wrt = new FileWriter(channel.getPath());
+			try (FileWriter wrt = new FileWriter(channel.getPath())) {
 				wrt.write("test data");
-				wrt.close();
 			}
 			
 			BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(channel);
@@ -116,4 +114,4 @@ public class FileChannelStreamsTest {
 			ioManager.shutdown();
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
index 95f2991..0388c2b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
@@ -221,28 +221,26 @@ public class DataSourceTaskTest extends TaskTestBase {
 	
 	private static class InputFilePreparator {
 		public static void prepareInputFile(MutableObjectIterator<Record> inIt, String inputFilePath, boolean insertInvalidData)
-		throws IOException
-		{
-			FileWriter fw = new FileWriter(inputFilePath);
-			BufferedWriter bw = new BufferedWriter(fw);
-			
-			if (insertInvalidData) {
-				bw.write("####_I_AM_INVALID_########\n");
-			}
-			
-			Record rec = new Record();
-			while ((rec = inIt.next(rec)) != null) {
-				IntValue key = rec.getField(0, IntValue.class);
-				IntValue value = rec.getField(1, IntValue.class);
-				
-				bw.write(key.getValue() + "_" + value.getValue() + "\n");
-			}
-			if (insertInvalidData) {
-				bw.write("####_I_AM_INVALID_########\n");
+		throws IOException {
+
+			try (BufferedWriter bw = new BufferedWriter(new FileWriter(inputFilePath))) {
+				if (insertInvalidData) {
+					bw.write("####_I_AM_INVALID_########\n");
+				}
+
+				Record rec = new Record();
+				while ((rec = inIt.next(rec)) != null) {
+					IntValue key = rec.getField(0, IntValue.class);
+					IntValue value = rec.getField(1, IntValue.class);
+
+					bw.write(key.getValue() + "_" + value.getValue() + "\n");
+				}
+				if (insertInvalidData) {
+					bw.write("####_I_AM_INVALID_########\n");
+				}
+
+				bw.flush();
 			}
-			
-			bw.flush();
-			bw.close();
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index f94ff68..cc4a527 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -166,17 +166,20 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 		File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml");
 		
 		assertTrue(tmp.createNewFile());
-		BufferedWriter b = new BufferedWriter(new FileWriter(tmp));
-
-		//copy cassandra.yaml; inject absolute paths into cassandra.yaml
-		Scanner scanner = new Scanner(file);
-		while (scanner.hasNextLine()) {
-			String line = scanner.nextLine();
-			line = line.replace("$PATH", "'" + tmp.getParentFile());
-			b.write(line + "\n");
-			b.flush();
+
+		try (
+			BufferedWriter b = new BufferedWriter(new FileWriter(tmp));
+
+			//copy cassandra.yaml; inject absolute paths into cassandra.yaml
+			Scanner scanner = new Scanner(file);
+		) {
+			while (scanner.hasNextLine()) {
+				String line = scanner.nextLine();
+				line = line.replace("$PATH", "'" + tmp.getParentFile());
+				b.write(line + "\n");
+				b.flush();
+			}
 		}
-		scanner.close();
 
 
 		// Tell cassandra where the configuration files are.

http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
index 22f304a..da03859 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
@@ -37,13 +37,11 @@ public class WriteFormatAsCsv<IN> extends WriteFormat<IN> {
 
 	@Override
 	protected void write(String path, ArrayList<IN> tupleList) {
-		try {
-			PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
+		try (PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)))) {
 			for (IN tupleToWrite : tupleList) {
 				String strTuple = tupleToWrite.toString();
 				outStream.println(strTuple.substring(1, strTuple.length() - 1));
 			}
-			outStream.close();
 		} catch (IOException e) {
 			throw new RuntimeException("Exception occured while writing file " + path, e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
index c36bc9e..e331ed9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
@@ -37,12 +37,10 @@ public class WriteFormatAsText<IN> extends WriteFormat<IN> {
 
 	@Override
 	public void write(String path, ArrayList<IN> tupleList) {
-		try {
-			PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
+		try (PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)))) {
 			for (IN tupleToWrite : tupleList) {
 				outStream.println(tupleToWrite);
 			}
-			outStream.close();
 		} catch (IOException e) {
 			throw new RuntimeException("Exception occured while writing file " + path, e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
index 5b05b67..4ed28a8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
@@ -46,9 +46,9 @@ public class LocalExecutorITCase {
 			inFile.deleteOnExit();
 			outFile.deleteOnExit();
 			
-			FileWriter fw = new FileWriter(inFile);
-			fw.write(WordCountData.TEXT);
-			fw.close();
+			try (FileWriter fw = new FileWriter(inFile)) {
+				fw.write(WordCountData.TEXT);
+			}
 
 			LocalExecutor executor = new LocalExecutor();
 			executor.setDefaultOverwriteFiles(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
index 8b98b29..4c5e955 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
@@ -91,9 +91,9 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 	@Test
 	public void testDistributedCacheWithIterations() throws Exception{
 		File tempFile = new File(testPath);
-		FileWriter writer = new FileWriter(tempFile);
-		writer.write(testString);
-		writer.close();
+		try (FileWriter writer = new FileWriter(tempFile)) {
+			writer.write(testString);
+		}
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.registerCachedFile(resultPath, testName);

http://git-wip-us.apache.org/repos/asf/flink/blob/f06930bc/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 31a3d98..6270010 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -246,10 +246,10 @@ public abstract class YarnTestBase extends TestLogger {
 		tmp.create();
 		File yarnSiteXML = new File(tmp.newFolder().getAbsolutePath() + "/yarn-site.xml");
 
-		FileWriter writer = new FileWriter(yarnSiteXML);
-		yarnConf.writeXml(writer);
-		writer.flush();
-		writer.close();
+		try (FileWriter writer = new FileWriter(yarnSiteXML)) {
+			yarnConf.writeXml(writer);
+			writer.flush();
+		}
 		return yarnSiteXML;
 	}
 


[4/4] flink git commit: [FLINK-4607] Close FileInputStream in ParameterTool and other

Posted by ch...@apache.org.
[FLINK-4607] Close FileInputStream in ParameterTool and other

This closes #2488.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db90580f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db90580f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db90580f

Branch: refs/heads/master
Commit: db90580ffdb93a55a6318b18b5d50ec3666b001b
Parents: 9046374
Author: Alexander Pivovarov <ap...@gmail.com>
Authored: Fri Sep 9 20:32:28 2016 -0700
Committer: zentol <ch...@apache.org>
Committed: Fri Sep 16 12:34:45 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/core/fs/local/LocalFileSystemTest.java |  6 +++---
 .../org/apache/flink/api/java/utils/ParameterTool.java  |  6 +++---
 .../apache/flink/api/java/utils/ParameterToolTest.java  |  4 +++-
 .../flink/runtime/util/JarFileCreatorLambdaTest.java    | 12 ++++++------
 .../apache/flink/runtime/util/JarFileCreatorTest.java   | 12 ++++++------
 5 files changed, 21 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/db90580f/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
index 299524d..d21e0f1 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
@@ -116,10 +116,10 @@ public class LocalFileSystemTest {
 
 			assertEquals(testfile1.length(), 5L);
 
-			final FileInputStream fisfile1 = new FileInputStream(testfile1);
 			byte[] testbytestest = new byte[5];
-			assertEquals(testbytestest.length, fisfile1.read(testbytestest));
-			fisfile1.close();
+			try (FileInputStream fisfile1 = new FileInputStream(testfile1)) {
+				assertEquals(testbytestest.length, fisfile1.read(testbytestest));
+			}
 			
 			assertArrayEquals(testbytes, testbytestest);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db90580f/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
index 38ae6df..a9389a5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
@@ -156,9 +156,9 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
 			throw new FileNotFoundException("Properties file " + propertiesFile.getAbsolutePath() + " does not exist");
 		}
 		Properties props = new Properties();
-		FileInputStream fis = new FileInputStream(propertiesFile);
-		props.load(fis);
-		fis.close();
+		try (FileInputStream fis = new FileInputStream(propertiesFile)) {
+			props.load(fis);
+		}
 		return fromMap((Map)props);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db90580f/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
index 09a7781..605f033 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
@@ -185,7 +185,9 @@ public class ParameterToolTest {
 			String pathToFile = tmp.newFile().getAbsolutePath();
 			parameter.createPropertiesFile(pathToFile);
 			Properties defaultProps = new Properties();
-			defaultProps.load(new FileInputStream(pathToFile));
+			try (FileInputStream fis = new FileInputStream(pathToFile)) {
+				defaultProps.load(fis);
+			}
 
 			Assert.assertEquals("myDefaultValue", defaultProps.get("output"));
 			Assert.assertEquals("-1", defaultProps.get("expectedCount"));

http://git-wip-us.apache.org/repos/asf/flink/blob/db90580f/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
index 249e082..d90f096 100644
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
@@ -98,13 +98,13 @@ public class JarFileCreatorLambdaTest {
 	}
 
 	public boolean validate(Set<String> expected, File out) throws Exception {
-
-		JarInputStream jis = new JarInputStream(new FileInputStream(out));
-		ZipEntry ze;
 		int count = expected.size();
-		while ((ze = jis.getNextEntry()) != null) {
-			count--;
-			expected.remove(ze.getName());
+		try (JarInputStream jis = new JarInputStream(new FileInputStream(out))) {
+			ZipEntry ze;
+			while ((ze = jis.getNextEntry()) != null) {
+				count--;
+				expected.remove(ze.getName());
+			}
 		}
 		return count == 0 && expected.size() == 0;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/db90580f/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
index ba207ec..8f8016e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
@@ -205,13 +205,13 @@ public class JarFileCreatorTest {
 	}
 
 	private boolean validate(Set<String> expected, File out) throws IOException {
-
-		JarInputStream jis = new JarInputStream(new FileInputStream(out));
-		ZipEntry ze;
 		int count = expected.size();
-		while ((ze = jis.getNextEntry()) != null) {
-			count--;
-			expected.remove(ze.getName());
+		try (JarInputStream jis = new JarInputStream(new FileInputStream(out))) {
+			ZipEntry ze;
+			while ((ze = jis.getNextEntry()) != null) {
+				count--;
+				expected.remove(ze.getName());
+			}
 		}
 		return count == 0 && expected.size() == 0;
 	}


[3/4] flink git commit: [FLINK-4608] Use short-circuit AND in Max/Min AggregationFunction

Posted by ch...@apache.org.
[FLINK-4608] Use short-circuit AND in Max/Min AggregationFunction

This closes #2489.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9046374c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9046374c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9046374c

Branch: refs/heads/master
Commit: 9046374c6b5b35589d90416725c1e8eb09843bea
Parents: f06930b
Author: Alexander Pivovarov <ap...@gmail.com>
Authored: Fri Sep 9 21:52:36 2016 -0700
Committer: zentol <ch...@apache.org>
Committed: Fri Sep 16 12:34:34 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/api/java/aggregation/MaxAggregationFunction.java  | 2 +-
 .../apache/flink/api/java/aggregation/MinAggregationFunction.java  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9046374c/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
index d1edbe2..69715c3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
@@ -98,7 +98,7 @@ public abstract class MaxAggregationFunction<T extends Comparable<T>> extends Ag
 		@Override
 		public <T> AggregationFunction<T> createAggregationFunction(Class<T> type) {
 			if (Comparable.class.isAssignableFrom(type)) {
-				if (ResettableValue.class.isAssignableFrom(type) & CopyableValue.class.isAssignableFrom(type)) {
+				if (ResettableValue.class.isAssignableFrom(type) && CopyableValue.class.isAssignableFrom(type)) {
 					return (AggregationFunction<T>) new MutableMaxAgg();
 				} else {
 					return (AggregationFunction<T>) new ImmutableMaxAgg();

http://git-wip-us.apache.org/repos/asf/flink/blob/9046374c/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
index a4a9e0e..c7ebcc5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
@@ -98,7 +98,7 @@ public abstract class MinAggregationFunction<T extends Comparable<T>> extends Ag
 		@Override
 		public <T> AggregationFunction<T> createAggregationFunction(Class<T> type) {
 			if (Comparable.class.isAssignableFrom(type)) {
-				if (ResettableValue.class.isAssignableFrom(type) & CopyableValue.class.isAssignableFrom(type)) {
+				if (ResettableValue.class.isAssignableFrom(type) && CopyableValue.class.isAssignableFrom(type)) {
 					return (AggregationFunction<T>) new MutableMinAgg();
 				} else {
 					return (AggregationFunction<T>) new ImmutableMinAgg();