You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/14 12:10:08 UTC

[01/12] flink git commit: [FLINK-5332] [core] Synchronize FileSystem::initOutPathLocalFS() to prevent lost files when called concurrently.

Repository: flink
Updated Branches:
  refs/heads/master 6cfc841b5 -> e4c767a37


[FLINK-5332] [core] Synchronize FileSystem::initOutPathLocalFS() to prevent lost files when called concurrently.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2f3ad58b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2f3ad58b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2f3ad58b

Branch: refs/heads/master
Commit: 2f3ad58b7b73463aa1827baef0eb2e9d87fdb882
Parents: 790153c
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Dec 13 19:12:12 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 14 12:43:32 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/core/fs/FileSystem.java    | 268 ++++++++++---------
 .../core/fs/local/LocalDataOutputStream.java    |  24 +-
 .../flink/core/fs/InitOutputPathTest.java       | 265 ++++++++++++++++++
 .../flink/core/testutils/OneShotLatch.java      |   5 +
 .../apache/flink/test/util/TestBaseUtils.java   |  50 ++--
 5 files changed, 436 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2f3ad58b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 3dced6f..e6313aa 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -44,6 +44,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -86,6 +87,12 @@ public abstract class FileSystem {
 
 	private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);
 
+	/** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and
+	 * {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are otherwise susceptible to races */
+	private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new ReentrantLock(true);
+
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Create a SafetyNetCloseableRegistry for a Task. This method should be called at the beginning of the task's
 	 * main thread.
@@ -537,23 +544,37 @@ public abstract class FileSystem {
 	/**
 	 * Initializes output directories on local file systems according to the given write mode.
 	 *
-	 * WriteMode.NO_OVERWRITE &amp; parallel output:
-	 *  - A directory is created if the output path does not exist.
-	 *  - An existing directory is reused, files contained in the directory are NOT deleted.
-	 *  - An existing file raises an exception.
+	 * <ul>
+	 *   <li>WriteMode.NO_OVERWRITE &amp; parallel output:
+	 *     <ul>
+	 *       <li>A directory is created if the output path does not exist.</li>
+	 *       <li>An existing directory is reused, files contained in the directory are NOT deleted.</li>
+	 *       <li>An existing file raises an exception.</li>
+	 *     </ul>
+	 *   </li>
 	 *
-	 * WriteMode.NO_OVERWRITE &amp; NONE parallel output:
-	 *  - An existing file or directory raises an exception.
+	 *   <li>WriteMode.NO_OVERWRITE &amp; NONE parallel output:
+	 *     <ul>
+	 *       <li>An existing file or directory raises an exception.</li>
+	 *     </ul>
+	 *   </li>
 	 *
-	 * WriteMode.OVERWRITE &amp; parallel output:
-	 *  - A directory is created if the output path does not exist.
-	 *  - An existing directory is reused, files contained in the directory are NOT deleted.
-	 *  - An existing file is deleted and replaced by a new directory.
-	 *
-	 * WriteMode.OVERWRITE &amp; NONE parallel output:
-	 *  - An existing file or directory (and all its content) is deleted
+	 *   <li>WriteMode.OVERWRITE &amp; parallel output:
+	 *     <ul>
+	 *       <li>A directory is created if the output path does not exist.</li>
+	 *       <li>An existing directory is reused, files contained in the directory are NOT deleted.</li>
+	 *       <li>An existing file is deleted and replaced by a new directory.</li>
+	 *     </ul>
+	 *   </li>
 	 *
-	 * Files contained in an existing directory are not deleted, because multiple instances of a
+	 *   <li>WriteMode.OVERWRITE &amp; NONE parallel output:
+	 *     <ul>
+	 *       <li>An existing file or directory (and all its content) is deleted</li>
+	 *     </ul>
+	 *   </li>
+	 * </ul>
+	 * 
+	 * <p>Files contained in an existing directory are not deleted, because multiple instances of a
 	 * DataSinkTask might call this function at the same time and hence might perform concurrent
 	 * delete operations on the file system (possibly deleting output files of concurrently running tasks).
 	 * Since concurrent DataSinkTasks are not aware of each other, coordination of delete and create
@@ -561,48 +582,58 @@ public abstract class FileSystem {
 	 *
 	 * @param outPath Output path that should be prepared.
 	 * @param writeMode Write mode to consider.
-	 * @param createDirectory True, to initialize a directory at the given path, false otherwise.
+	 * @param createDirectory True, to initialize a directory at the given path, false to prepare space for a file.
+	 *    
 	 * @return True, if the path was successfully prepared, false otherwise.
-	 * @throws IOException
+	 * @throws IOException Thrown, if any of the file system access operations failed.
 	 */
 	public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
-		if (this.isDistributedFS()) {
+		if (isDistributedFS()) {
 			return false;
 		}
 
-		// NOTE: we sometimes see this code block fail due to a races when changes to the file system take small time fractions before being
-		//       visible to other threads. for example:
-		// - the check whether the directory exists returns false
-		// - the call to create the directory fails (some concurrent thread is creating the directory, locked)
-		// - the call to check whether the directory exists does not yet see the new directory (change is not committed)
+		// NOTE: We actually need to lock here (process wide). Otherwise, multiple threads that
+		// concurrently work in this method (multiple output formats writing locally) might end
+		// up deleting each other's directories and leave non-retrievable files, without necessarily
+		// causing an exception. That results in very subtle issues, like output files looking as if
+		// they are not getting created.
 
-		// try for 30 seconds
-		final long now = System.currentTimeMillis();
-		final long deadline = now + 30000;
+		// we acquire the lock interruptibly here, to make sure that concurrent threads waiting
+		// here can cancel faster
+		try {
+			OUTPUT_DIRECTORY_INIT_LOCK.lockInterruptibly();
+		}
+		catch (InterruptedException e) {
+			// restore the interruption state
+			Thread.currentThread().interrupt();
 
-		Exception lastError = null;
+			// leave the method - we don't have the lock anyways 
+			throw new IOException("The thread was interrupted while trying to initialize the output directory");
+		}
 
-		do {
-			FileStatus status = null;
+		try {
+			FileStatus status;
 			try {
 				status = getFileStatus(outPath);
 			}
 			catch (FileNotFoundException e) {
 				// okay, the file is not there
+				status = null;
 			}
 
 			// check if path exists
 			if (status != null) {
 				// path exists, check write mode
 				switch (writeMode) {
+
 				case NO_OVERWRITE:
 					if (status.isDir() && createDirectory) {
 						return true;
 					} else {
 						// file may not be overwritten
-						throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " +
-								WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() +
-								" mode to overwrite existing files and directories.");
+						throw new IOException("File or directory already exists. Existing files and directories " +
+								"are not overwritten in " + WriteMode.NO_OVERWRITE.name() + " mode. Use " + 
+								WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories.");
 					}
 
 				case OVERWRITE:
@@ -612,31 +643,27 @@ public abstract class FileSystem {
 							return true;
 						} else {
 							// we will write in a single file, delete directory
-							// (there is also no other thread trying to delete the directory, since there is only one writer).
 							try {
-								this.delete(outPath, true);
+								delete(outPath, true);
 							}
 							catch (IOException e) {
-								// due to races in some file systems, it may spuriously occur that a deleted the file looks
-								// as if it still exists and is gone a millisecond later, once the change is committed
-								// we ignore the exception, possibly fall through the loop later
-								lastError = e;
+								throw new IOException("Could not remove existing directory '" + outPath + 
+										"' to allow overwrite by result file", e);
 							}
 						}
 					}
 					else {
 						// delete file
 						try {
-							this.delete(outPath, false);
+							delete(outPath, false);
 						}
 						catch (IOException e) {
-							// Some other thread might already have deleted the file.
-							// If - for some other reason - the file could not be deleted,
-							// the error will be handled later.
-							lastError = e;
+							throw new IOException("Could not remove existing file '" + outPath +
+									"' to allow overwrite by result file/directory", e);
 						}
 					}
 					break;
+
 				default:
 					throw new IllegalArgumentException("Invalid write mode: " + writeMode);
 				}
@@ -644,54 +671,26 @@ public abstract class FileSystem {
 
 			if (createDirectory) {
 				// Output directory needs to be created
-
-				try {
-					if (!this.exists(outPath)) {
-						this.mkdirs(outPath);
-					}
-				}
-				catch (IOException e) {
-					// Some other thread might already have created the directory concurrently.
-					lastError = e;
+				if (!exists(outPath)) {
+					mkdirs(outPath);
 				}
 
 				// double check that the output directory exists
 				try {
-					FileStatus check = getFileStatus(outPath);
-					if (check != null) {
-						if (check.isDir()) {
-							return true;
-						}
-						else {
-							lastError = new IOException("FileSystem should create an output directory, but the path points to a file instead.");
-						}
-					}
-					// fall through the loop
+					return getFileStatus(outPath).isDir();
 				}
 				catch (FileNotFoundException e) {
-					// fall though the loop
+					return false;
 				}
-
 			}
 			else {
-				// check that the output path does not exist and an output file can be created by the output format.
-				return !this.exists(outPath);
-			}
-
-			// small delay to allow changes to make progress
-			try {
-				Thread.sleep(10);
-			}
-			catch (InterruptedException e) {
-				throw new IOException("Thread was interrupted");
+				// check that the output path does not exist and an output file
+				// can be created by the output format.
+				return !exists(outPath);
 			}
 		}
-		while (System.currentTimeMillis() < deadline);
-
-		if (lastError != null) {
-			throw new IOException("File system failed to prepare output path " + outPath + " with write mode " + writeMode.name(), lastError);
-		} else {
-			return false;
+		finally {
+			OUTPUT_DIRECTORY_INIT_LOCK.unlock();
 		}
 	}
 
@@ -716,58 +715,87 @@ public abstract class FileSystem {
 	 * @param outPath Output path that should be prepared.
 	 * @param writeMode Write mode to consider.
 	 * @param createDirectory True, to initialize a directory at the given path, false otherwise.
+	 *    
 	 * @return True, if the path was successfully prepared, false otherwise.
-	 * @throws IOException
+	 * 
+	 * @throws IOException Thrown, if any of the file system access operations failed.
 	 */
 	public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
-		if (!this.isDistributedFS()) {
+		if (!isDistributedFS()) {
 			return false;
 		}
 
-		// check if path exists
-		if (this.exists(outPath)) {
-			// path exists, check write mode
-			switch(writeMode) {
-			case NO_OVERWRITE:
-				// file or directory may not be overwritten
-				throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " +
-						WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() +
-							" mode to overwrite existing files and directories.");
-			case OVERWRITE:
-				// output path exists. We delete it and all contained files in case of a directory.
-				try {
-					this.delete(outPath, true);
-				} catch(IOException ioe) {
-					// Some other thread might already have deleted the path.
-					// If - for some other reason - the path could not be deleted,
-					// this will be handled later.
-				}
-				break;
-			default:
-				throw new IllegalArgumentException("Invalid write mode: "+writeMode);
-			}
+		// NOTE: We actually need to lock here (process wide). Otherwise, multiple threads that
+		// concurrently work in this method (multiple output formats writing locally) might end
+		// up deleting each other's directories and leave non-retrievable files, without necessarily
+		// causing an exception. That results in very subtle issues, like output files looking as if
+		// they are not getting created.
+
+		// we acquire the lock interruptibly here, to make sure that concurrent threads waiting
+		// here can cancel faster
+		try {
+			OUTPUT_DIRECTORY_INIT_LOCK.lockInterruptibly();
 		}
+		catch (InterruptedException e) {
+			// restore the interruption state
+			Thread.currentThread().interrupt();
 
-		if (createDirectory) {
-			// Output directory needs to be created
-			try {
-				if (!this.exists(outPath)) {
-					this.mkdirs(outPath);
+			// leave the method - we don't have the lock anyways 
+			throw new IOException("The thread was interrupted while trying to initialize the output directory");
+		}
+
+		try {
+			// check if path exists
+			if (exists(outPath)) {
+				// path exists, check write mode
+				switch(writeMode) {
+	
+				case NO_OVERWRITE:
+					// file or directory may not be overwritten
+					throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " +
+							WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() +
+								" mode to overwrite existing files and directories.");
+	
+				case OVERWRITE:
+					// output path exists. We delete it and all contained files in case of a directory.
+					try {
+						delete(outPath, true);
+					} catch (IOException e) {
+						// Some other thread might already have deleted the path.
+						// If - for some other reason - the path could not be deleted,
+						// this will be handled later.
+					}
+					break;
+	
+				default:
+					throw new IllegalArgumentException("Invalid write mode: "+writeMode);
 				}
-			} catch(IOException ioe) {
-				// Some other thread might already have created the directory.
-				// If - for some other reason - the directory could not be created  
-				// and the path does not exist, this will be handled later.
 			}
-
-			// double check that the output directory exists
-			return this.exists(outPath) && this.getFileStatus(outPath).isDir();
-		} else {
-
-			// check that the output path does not exist and an output file can be created by the output format.
-			return !this.exists(outPath);
+	
+			if (createDirectory) {
+				// Output directory needs to be created
+				try {
+					if (!exists(outPath)) {
+						mkdirs(outPath);
+					}
+				} catch (IOException ioe) {
+					// Some other thread might already have created the directory.
+					// If - for some other reason - the directory could not be created  
+					// and the path does not exist, this will be handled later.
+				}
+	
+				// double check that the output directory exists
+				return exists(outPath) && getFileStatus(outPath).isDir();
+			}
+			else {
+				// single file case: check that the output path does not exist and
+				// an output file can be created by the output format.
+				return !exists(outPath);
+			}
+		}
+		finally {
+			OUTPUT_DIRECTORY_INIT_LOCK.unlock();
 		}
-
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2f3ad58b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
index c3b793d..5cc011b 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
@@ -19,7 +19,6 @@
 package org.apache.flink.core.fs.local;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 
@@ -33,12 +32,8 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 @Internal
 public class LocalDataOutputStream extends FSDataOutputStream {
 
-	private static final int MAX_OPEN_TRIES = 3;
-	
-	/**
-	 * The file output stream used to write data.
-	 */
-	private FileOutputStream fos;
+	/** The file output stream used to write data.*/
+	private final FileOutputStream fos;
 
 	/**
 	 * Constructs a new <code>LocalDataOutputStream</code> object from a given {@link File} object.
@@ -49,20 +44,7 @@ public class LocalDataOutputStream extends FSDataOutputStream {
 	 *         thrown if the data output stream cannot be created
 	 */
 	public LocalDataOutputStream(final File file) throws IOException {
-		// we allow multiple tries to create the file, to increase resilience against spurious I/O failures
-		
-		FileNotFoundException lastException = null;
-		
-		for (int attempt = 0; attempt < MAX_OPEN_TRIES; attempt++) {
-			try {
-				this.fos = new FileOutputStream(file);
-				return;
-			}
-			catch (FileNotFoundException e) {
-				lastException = e;
-			}
-		}
-		throw lastException;
+		this.fos = new FileOutputStream(file);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2f3ad58b/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java
new file mode 100644
index 0000000..c332324
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.local.LocalDataOutputStream;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.core.testutils.OneShotLatch;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.powermock.api.mockito.PowerMockito.*;
+import static org.junit.Assert.*;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(LocalFileSystem.class)
+public class InitOutputPathTest {
+
+	@Rule
+	public final TemporaryFolder tempDir = new TemporaryFolder();
+
+	/**
+	 * This test validates that this test case makes sense - that the error can be produced
+	 * in the absence of synchronization, if the threads make progress in a certain way,
+	 * here enforced by latches.
+	 */
+	@Test
+	public void testErrorOccursUnSynchronized() throws Exception {
+		// deactivate the lock to produce the original un-synchronized state
+		Field lock = FileSystem.class.getDeclaredField("OUTPUT_DIRECTORY_INIT_LOCK");
+		lock.setAccessible(true);
+		lock.set(null, new NoOpLock());
+
+		try {
+			// in the original un-synchronized state, we can force the race to occur by using
+			// the proper latch order to control the process of the concurrent threads
+			runTest(true);
+			fail("should fail with an exception");
+		}
+		catch (FileNotFoundException e) {
+			// expected
+		}
+		finally {
+			// reset the proper value
+			lock.set(null, new ReentrantLock(true));
+		}
+	}
+
+	@Test
+	public void testProperSynchronized() throws Exception {
+		// in the synchronized variant, we cannot use the "await latches" because not 
+		// both threads can make process interleaved (due to the synchronization)
+		// the test uses sleeps (rather than latches) to produce the same interleaving.
+		// while that is not guaranteed to produce the pathological interleaving,
+		// it helps to provoke it very often. together with validating that this order
+		// is in fact pathological (see testErrorOccursUnSynchronized()), this gives
+		// a rather confident guard
+		runTest(false);
+	}
+
+	private void runTest(final boolean useAwaits) throws Exception {
+		final File tempFile = tempDir.newFile();
+		final Path path1 = new Path(tempFile.getAbsolutePath(), "1");
+		final Path path2 = new Path(tempFile.getAbsolutePath(), "2");
+
+		final OneShotLatch deleteAwaitLatch1 = new OneShotLatch();
+		final OneShotLatch deleteAwaitLatch2 = new OneShotLatch();
+		final OneShotLatch mkdirsAwaitLatch1 = new OneShotLatch();
+		final OneShotLatch mkdirsAwaitLatch2 = new OneShotLatch();
+
+		final OneShotLatch deleteTriggerLatch1 = new OneShotLatch();
+		final OneShotLatch deletetriggerLatch2 = new OneShotLatch();
+		final OneShotLatch mkdirsTriggerLatch1 = new OneShotLatch();
+		final OneShotLatch mkdirsTriggerLatch2 = new OneShotLatch();
+
+		final OneShotLatch createAwaitLatch = new OneShotLatch();
+		final OneShotLatch createTriggerLatch = new OneShotLatch();
+
+		// this "new LocalDataOutputStream()" is in the end called by the async threads
+		whenNew(LocalDataOutputStream.class).withAnyArguments().thenAnswer(new Answer<LocalDataOutputStream>() {
+
+			@Override
+			public LocalDataOutputStream answer(InvocationOnMock invocation) throws Throwable {
+				createAwaitLatch.trigger();
+				createTriggerLatch.await();
+
+				final File file = (File) invocation.getArguments()[0];
+				return new LocalDataOutputStream(file);
+			}
+		});
+
+		final LocalFileSystem fs1 = new SyncedFileSystem(
+				deleteAwaitLatch1, mkdirsAwaitLatch1, 
+				deleteTriggerLatch1, mkdirsTriggerLatch1);
+
+		final LocalFileSystem fs2 = new SyncedFileSystem(
+				deleteAwaitLatch2, mkdirsAwaitLatch2,
+				deletetriggerLatch2, mkdirsTriggerLatch2);
+
+		// start the concurrent file creators
+		FileCreator thread1 = new FileCreator(fs1, path1);
+		FileCreator thread2 = new FileCreator(fs2, path2);
+		thread1.start();
+		thread2.start();
+
+		// wait until they both decide to delete the directory
+		if (useAwaits) {
+			deleteAwaitLatch1.await();
+			deleteAwaitLatch2.await();
+		} else {
+			Thread.sleep(5);
+		}
+
+		// now send off #1 to delete the directory (it will pass the 'mkdirs' fast) and wait to create the file
+		mkdirsTriggerLatch1.trigger();
+		deleteTriggerLatch1.trigger();
+
+		if (useAwaits) {
+			createAwaitLatch.await();
+		} else {
+			// this needs a bit more sleep time, because here mockito is working
+			Thread.sleep(100);
+		}
+
+		// now send off #2 to delete the directory - it waits at 'mkdirs'
+		deletetriggerLatch2.trigger();
+		if (useAwaits) {
+			mkdirsAwaitLatch2.await();
+		} else {
+			Thread.sleep(5);
+		}
+
+		// let #1 try to create the file and see if it succeeded
+		createTriggerLatch.trigger();
+		if (useAwaits) {
+			thread1.sync();
+		} else {
+			Thread.sleep(5);
+		}
+
+		// now let #1 finish up
+		mkdirsTriggerLatch2.trigger();
+
+		thread1.sync();
+		thread2.sync();
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class FileCreator extends CheckedThread {
+
+		private final FileSystem fs;
+		private final Path path;
+
+		FileCreator(FileSystem fs, Path path) {
+			this.fs = fs;
+			this.path = path;
+		}
+
+		@Override
+		public void go() throws Exception {
+			fs.initOutPathLocalFS(path.getParent(), WriteMode.OVERWRITE, true);
+			try (FSDataOutputStream out = fs.create(path, true)) {
+				out.write(11);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class SyncedFileSystem extends LocalFileSystem {
+
+		private final OneShotLatch deleteTriggerLatch;
+		private final OneShotLatch mkdirsTriggerLatch;
+
+		private final OneShotLatch deleteAwaitLatch;
+		private final OneShotLatch mkdirsAwaitLatch;
+
+		SyncedFileSystem(
+				OneShotLatch deleteTriggerLatch,
+				OneShotLatch mkdirsTriggerLatch,
+				OneShotLatch deleteAwaitLatch,
+				OneShotLatch mkdirsAwaitLatch) {
+
+			this.deleteTriggerLatch = deleteTriggerLatch;
+			this.mkdirsTriggerLatch = mkdirsTriggerLatch;
+			this.deleteAwaitLatch = deleteAwaitLatch;
+			this.mkdirsAwaitLatch = mkdirsAwaitLatch;
+		}
+
+		@Override
+		public boolean delete(Path f, boolean recursive) throws IOException {
+			deleteTriggerLatch.trigger();
+			try {
+				deleteAwaitLatch.await();
+			}
+			catch (InterruptedException e) {
+				Thread.currentThread().interrupt();
+				throw new IOException("interrupted");
+			}
+
+			return super.delete(f, recursive);
+		}
+
+		@Override
+		public boolean mkdirs(Path f) throws IOException {
+			mkdirsTriggerLatch.trigger();
+			try {
+				mkdirsAwaitLatch.await();
+			}
+			catch (InterruptedException e) {
+				Thread.currentThread().interrupt();
+				throw new IOException("interrupted");
+			}
+
+			return super.mkdirs(f);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static final class NoOpLock extends ReentrantLock {
+
+		@Override
+		public void lock() {}
+
+		@Override
+		public void lockInterruptibly() {}
+
+		@Override
+		public void unlock() {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f3ad58b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
index b3d86e5..d2eeb04 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
@@ -116,4 +116,9 @@ public final class OneShotLatch {
 			triggered = false;
 		}
 	}
+
+	@Override
+	public String toString() {
+		return "Latch " + (triggered ? "TRIGGERED" : "PENDING");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2f3ad58b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 251c465..8431226 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -309,43 +309,23 @@ public class TestBaseUtils extends TestLogger {
 			String resultPath,
 			String[] excludePrefixes) throws Exception {
 
-		// because of some strange I/O inconsistency effects on CI infrastructure, we need
-		// to retry this a few times
-		final int numAttempts = 5;
-		int attempt = 0;
-		while (true) {
-			try {
-				ArrayList<String> list = new ArrayList<>();
-				readAllResultLines(list, resultPath, excludePrefixes, false);
-
-				String[] result = list.toArray(new String[list.size()]);
-				Arrays.sort(result);
-
-				String[] expected = expectedResultStr.isEmpty() ? new String[0] : expectedResultStr.split("\n");
-				Arrays.sort(expected);
-
-				if (expected.length != result.length || !Arrays.deepEquals(expected, result)) {
-					String msg = String.format(
-							"Different elements in arrays: expected %d elements and received %d\n" +
-							"files: %s\n expected: %s\n received: %s",
-							expected.length, result.length, 
-							Arrays.toString(getAllInvolvedFiles(resultPath, excludePrefixes)), 
-							Arrays.toString(expected), Arrays.toString(result));
-					fail(msg);
-				}
+		ArrayList<String> list = new ArrayList<>();
+		readAllResultLines(list, resultPath, excludePrefixes, false);
 
-				break;
-			}
-			catch (AssertionError e) {
-				if (++attempt > numAttempts) {
-					throw e;
-				}
+		String[] result = list.toArray(new String[list.size()]);
+		Arrays.sort(result);
 
-				// else wait, then fall through the loop and try again
-				// on normal setups, this should change nothing, but it seems to help the
-				// Travis CI container infrastructure
-				Thread.sleep(100);
-			}
+		String[] expected = expectedResultStr.isEmpty() ? new String[0] : expectedResultStr.split("\n");
+		Arrays.sort(expected);
+
+		if (expected.length != result.length || !Arrays.deepEquals(expected, result)) {
+			String msg = String.format(
+					"Different elements in arrays: expected %d elements and received %d\n" +
+					"files: %s\n expected: %s\n received: %s",
+					expected.length, result.length, 
+					Arrays.toString(getAllInvolvedFiles(resultPath, excludePrefixes)), 
+					Arrays.toString(expected), Arrays.toString(result));
+			fail(msg);
 		}
 	}
 


[02/12] flink git commit: [hotfix] [tests] Improve exception message for file-based result verification in 'TestBaseUtils'

Posted by se...@apache.org.
[hotfix] [tests] Improve exception message for file-based result verification in 'TestBaseUtils'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47a61051
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47a61051
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47a61051

Branch: refs/heads/master
Commit: 47a61051a903be957d7cffc752d7c71990579c56
Parents: 6cfc841
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Dec 13 12:05:06 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 14 12:43:32 2016 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/test/util/TestBaseUtils.java  | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/47a61051/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index b8470b3..251c465 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -76,6 +76,7 @@ import java.util.regex.Pattern;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class TestBaseUtils extends TestLogger {
 
@@ -323,8 +324,15 @@ public class TestBaseUtils extends TestLogger {
 				String[] expected = expectedResultStr.isEmpty() ? new String[0] : expectedResultStr.split("\n");
 				Arrays.sort(expected);
 
-				Assert.assertEquals("Different number of lines in expected and obtained result.", expected.length, result.length);
-				Assert.assertArrayEquals(expected, result);
+				if (expected.length != result.length || !Arrays.deepEquals(expected, result)) {
+					String msg = String.format(
+							"Different elements in arrays: expected %d elements and received %d\n" +
+							"files: %s\n expected: %s\n received: %s",
+							expected.length, result.length, 
+							Arrays.toString(getAllInvolvedFiles(resultPath, excludePrefixes)), 
+							Arrays.toString(expected), Arrays.toString(result));
+					fail(msg);
+				}
 
 				break;
 			}


[03/12] flink git commit: [tests] Add 'CheckedThread' as a common test utility

Posted by se...@apache.org.
[tests] Add 'CheckedThread' as a common test utility


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/790153c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/790153c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/790153c0

Branch: refs/heads/master
Commit: 790153c065f79ae0e8bb045b96c85f8195bc7a29
Parents: 3feea13
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Dec 13 19:15:11 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 14 12:43:32 2016 +0100

----------------------------------------------------------------------
 .../partition/InputGateConcurrentTest.java      | 35 +-------
 .../flink/core/testutils/CheckedThread.java     | 85 ++++++++++++++++++++
 2 files changed, 86 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/790153c0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index 8cae04c..27177c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -223,40 +224,6 @@ public class InputGateConcurrentTest {
 	//  testing threads
 	// ------------------------------------------------------------------------
 
-	private static abstract class CheckedThread extends Thread {
-
-		private volatile Throwable error;
-
-		public abstract void go() throws Exception;
-
-		@Override
-		public void run() {
-			try {
-				go();
-			}
-			catch (Throwable t) {
-				error = t;
-			}
-		}
-
-		public void sync() throws Exception {
-			join();
-
-			// propagate the error
-			if (error != null) {
-				if (error instanceof Error) {
-					throw (Error) error;
-				}
-				else if (error instanceof Exception) {
-					throw (Exception) error;
-				}
-				else {
-					throw new Exception(error.getMessage(), error);
-				}
-			}
-		}
-	}
-
 	private static class ProducerThread extends CheckedThread {
 
 		private final Random rnd = new Random();

http://git-wip-us.apache.org/repos/asf/flink/blob/790153c0/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
new file mode 100644
index 0000000..1dad8c8
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+/**
+ * A thread that additionally catches exceptions and offers a joining method that
+ * re-throws the exceptions.
+ *
+ * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link Runnable}), one
+ * needs to extends this class and implement the {@link #go()} method. That method may
+ * throw exceptions.
+ *
+ * <p>Exception from the {@link #go()} method are caught and re-thrown when joining this
+ * thread via the {@link #sync()} method.
+ */
+public abstract class CheckedThread extends Thread {
+
+	/** The error thrown from the main work method */
+	private volatile Throwable error;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This method needs to be overwritten to contain the main work logic.
+	 * It takes the role of {@link Thread#run()}, but should propagate exceptions.
+	 *
+	 * @throws Exception The exceptions thrown here will be re-thrown in the {@link #sync()} method.
+	 */
+	public abstract void go() throws Exception;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This method is final - thread work should go into the {@link #go()} method instead.
+	 */
+	@Override
+	public final void run() {
+		try {
+			go();
+		}
+		catch (Throwable t) {
+			error = t;
+		}
+	}
+
+	/**
+	 * Waits until the thread is completed and checks whether any error occurred during
+	 * the execution.
+	 *
+	 * <p>This method blocks like {@link #join()}, but performs an additional check for
+	 * exceptions thrown from the {@link #go()} method.
+	 */
+	public void sync() throws Exception {
+		super.join();
+
+		// propagate the error
+		if (error != null) {
+			if (error instanceof Error) {
+				throw (Error) error;
+			}
+			else if (error instanceof Exception) {
+				throw (Exception) error;
+			}
+			else {
+				throw new Exception(error.getMessage(), error);
+			}
+		}
+	}
+}


[11/12] flink git commit: [tests] Remove redundant copies of the JUnit RetryRules

Posted by se...@apache.org.
[tests] Remove redundant copies of the JUnit RetryRules

The classes were noved to 'flink-test-utils-junit', but apparently copies remained in 'flink-core'.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4ff4804
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4ff4804
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4ff4804

Branch: refs/heads/master
Commit: a4ff4804a7029e5593b03c32074a386bc9d6102b
Parents: c906ad9
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Dec 13 21:42:08 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 14 12:43:33 2016 +0100

----------------------------------------------------------------------
 .../flink/testutils/junit/RetryOnException.java |  60 -------
 .../testutils/junit/RetryOnExceptionTest.java   |  83 ---------
 .../flink/testutils/junit/RetryOnFailure.java   |  49 ------
 .../testutils/junit/RetryOnFailureTest.java     |  78 ---------
 .../apache/flink/testutils/junit/RetryRule.java | 170 -------------------
 5 files changed, 440 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a4ff4804/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryOnException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryOnException.java b/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryOnException.java
deleted file mode 100644
index 080377b..0000000
--- a/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryOnException.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.testutils.junit;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * Annotation to use with {@link org.apache.flink.testutils.junit.RetryRule}.
- *
- * <p>Add the {@link org.apache.flink.testutils.junit.RetryRule} to your test and
- * annotate tests with {@link org.apache.flink.testutils.junit.RetryOnException}.
- *
- * <pre>
- * public class YourTest {
- *
- *     {@literal @}Rule
- *     public RetryRule retryRule = new RetryRule();
- *
- *     {@literal @}Test
- *     {@literal @}RetryOnException(times=1, exception=IOException.class)
- *     public void yourTest() throws Exception {
- *         // This will be retried 1 time (total runs 2) before failing the test.
- *         throw new IOException("Failing test");
- *     }
- *     
- *     {@literal @}Test
- *     {@literal @}RetryOnException(times=1, exception=IOException.class)
- *     public void yourTest() throws Exception {
- *         // This will not be retried, because it throws the wrong exception
- *         throw new IllegalStateException("Failing test");
- *     }
- * }
- * </pre>
- */
-@Retention(RetentionPolicy.RUNTIME)
-@Target(java.lang.annotation.ElementType.METHOD)
-public @interface RetryOnException {
-
-	int times();
-	
-	Class<? extends Throwable> exception();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ff4804/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryOnExceptionTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryOnExceptionTest.java b/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryOnExceptionTest.java
deleted file mode 100644
index a7a6f4b..0000000
--- a/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryOnExceptionTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.testutils.junit;
-
-import org.junit.AfterClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class RetryOnExceptionTest {
-
-	@Rule
-	public RetryRule retryRule = new RetryRule();
-
-	private static final int NUMBER_OF_RUNS = 3;
-	
-	private static int runsForSuccessfulTest = 0;
-
-	private static int runsForTestWithMatchingException = 0;
-
-	private static int runsForTestWithSubclassException = 0;
-	
-	private static int runsForPassAfterOneFailure = 0;
-
-	
-	@AfterClass
-	public static void verify() {
-		assertEquals(NUMBER_OF_RUNS + 1, runsForTestWithMatchingException);
-		assertEquals(NUMBER_OF_RUNS + 1, runsForTestWithSubclassException);
-		assertEquals(1, runsForSuccessfulTest);
-		assertEquals(2, runsForPassAfterOneFailure);
-	}
-
-	@Test
-	@RetryOnException(times = NUMBER_OF_RUNS, exception = IllegalArgumentException.class)
-	public void testSuccessfulTest() {
-		runsForSuccessfulTest++;
-	}
-
-	@Test
-	@RetryOnException(times = NUMBER_OF_RUNS, exception = IllegalArgumentException.class)
-	public void testMatchingException() {
-		runsForTestWithMatchingException++;
-		if (runsForTestWithMatchingException <= NUMBER_OF_RUNS) {
-			throw new IllegalArgumentException();
-		}
-	}
-
-	@Test
-	@RetryOnException(times = NUMBER_OF_RUNS, exception = RuntimeException.class)
-	public void testSubclassException() {
-		runsForTestWithSubclassException++;
-		if (runsForTestWithSubclassException <= NUMBER_OF_RUNS) {
-			throw new IllegalArgumentException();
-		}
-	}
-
-	@Test
-	@RetryOnException(times = NUMBER_OF_RUNS, exception = IllegalArgumentException.class)
-	public void testPassAfterOneFailure() {
-		runsForPassAfterOneFailure++;
-		if (runsForPassAfterOneFailure == 1) {
-			throw new IllegalArgumentException();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ff4804/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryOnFailure.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryOnFailure.java b/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryOnFailure.java
deleted file mode 100644
index 42b8ef6..0000000
--- a/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryOnFailure.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.testutils.junit;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * Annotation to use with {@link RetryRule}.
- *
- * <p>Add the {@link RetryRule} to your test and annotate tests with {@link RetryOnFailure}.
- *
- * <pre>
- * public class YourTest {
- *
- *     {@literal @}Rule
- *     public RetryRule retryRule = new RetryRule();
- *
- *     {@literal @}Test
- *     {@literal @}RetryOnFailure(times=1)
- *     public void yourTest() {
- *         // This will be retried 1 time (total runs 2) before failing the test.
- *         throw new Exception("Failing test");
- *     }
- * }
- * </pre>
- */
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ java.lang.annotation.ElementType.METHOD })
-public @interface RetryOnFailure {
-	int times();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ff4804/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryOnFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryOnFailureTest.java b/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryOnFailureTest.java
deleted file mode 100644
index efc7ba4..0000000
--- a/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryOnFailureTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.testutils.junit;
-
-import org.junit.AfterClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class RetryOnFailureTest {
-
-	@Rule
-	public RetryRule retryRule = new RetryRule();
-
-	private static final int NUMBER_OF_RUNS = 5;
-
-	private static int numberOfFailedRuns;
-
-	private static int numberOfSuccessfulRuns;
-
-	private static boolean firstRun = true;
-
-	@AfterClass
-	public static void verify() throws Exception {
-		assertEquals(NUMBER_OF_RUNS + 1, numberOfFailedRuns);
-		assertEquals(3, numberOfSuccessfulRuns);
-	}
-
-	@Test
-	@RetryOnFailure(times = NUMBER_OF_RUNS)
-	public void testRetryOnFailure() throws Exception {
-		// All but the (expected) last run should be successful
-		if (numberOfFailedRuns < NUMBER_OF_RUNS) {
-			numberOfFailedRuns++;
-			throw new RuntimeException("Expected test exception");
-		}
-		else {
-			numberOfSuccessfulRuns++;
-		}
-	}
-
-	@Test
-	@RetryOnFailure(times = NUMBER_OF_RUNS)
-	public void testRetryOnceOnFailure() throws Exception {
-		if (firstRun) {
-			numberOfFailedRuns++;
-			firstRun = false;
-			throw new RuntimeException("Expected test exception");
-		}
-		else {
-			numberOfSuccessfulRuns++;
-		}
-	}
-
-	@Test
-	@RetryOnFailure(times = NUMBER_OF_RUNS)
-	public void testDontRetryOnSuccess() throws Exception {
-		numberOfSuccessfulRuns++;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ff4804/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java b/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java
deleted file mode 100644
index 782f4fb..0000000
--- a/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.testutils.junit;
-
-import org.junit.Test;
-import org.junit.rules.TestRule;
-import org.junit.runner.Description;
-import org.junit.runners.model.Statement;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A rule to retry failed tests for a fixed number of times.
- *
- * <p>Add the {@link RetryRule} to your test and annotate tests with {@link RetryOnFailure}.
- *
- * <pre>
- * public class YourTest {
- *
- *     {@literal @}Rule
- *     public RetryRule retryRule = new RetryRule();
- *
- *     {@literal @}Test
- *     {@literal @}RetryOnFailure(times=1)
- *     public void yourTest() {
- *         // This will be retried 1 time (total runs 2) before failing the test.
- *         throw new Exception("Failing test");
- *     }
- * }
- * </pre>
- */
-public class RetryRule implements TestRule {
-
-	public final static Logger LOG = LoggerFactory.getLogger(RetryRule.class);
-
-	@Override
-	public Statement apply(Statement statement, Description description) {
-		RetryOnFailure retryOnFailure = description.getAnnotation(RetryOnFailure.class);
-		RetryOnException retryOnException = description.getAnnotation(RetryOnException.class);
-
-		// sanity check that we don't use expected exceptions with the RetryOnX annotations
-		if (retryOnFailure != null || retryOnException != null) {
-			Test test = description.getAnnotation(Test.class);
-			if (test.expected() != Test.None.class) {
-				throw new IllegalArgumentException("You cannot combine the RetryOnFailure " +
-						"annotation with the Test(expected) annotation.");
-			}
-		}
-
-		// sanity check that we don't use both annotations
-		if (retryOnFailure != null && retryOnException != null) {
-			throw new IllegalArgumentException(
-					"You cannot combine the RetryOnFailure and RetryOnException annotations.");
-		}
-		
-		if (retryOnFailure != null) {
-			return new RetryOnFailureStatement(retryOnFailure.times(), statement);
-		}
-		else if (retryOnException != null) {
-			return new RetryOnExceptionStatement(retryOnException.times(), retryOnException.exception(), statement);
-		}
-		else {
-			return statement;
-		}
-	}
-
-	/**
-	 * Retries a test in case of a failure.
-	 */
-	private static class RetryOnFailureStatement extends Statement {
-
-		private final int timesOnFailure;
-
-		private int currentRun;
-
-		private final Statement statement;
-
-		private RetryOnFailureStatement(int timesOnFailure, Statement statement) {
-			checkArgument(timesOnFailure >= 0, "Negatives number of retries on failure");
-			this.timesOnFailure = timesOnFailure;
-			this.statement = statement;
-		}
-
-		/**
-		 * Retry a test in case of a failure.
-		 *
-		 * @throws Throwable
-		 */
-		@Override
-		public void evaluate() throws Throwable {
-			for (currentRun = 0; currentRun <= timesOnFailure; currentRun++) {
-				try {
-					statement.evaluate();
-					break; // success
-				}
-				catch (Throwable t) {
-					LOG.warn(String.format("Test run failed (%d/%d).",
-							currentRun, timesOnFailure + 1), t);
-
-					// Throw the failure if retried too often
-					if (currentRun == timesOnFailure) {
-						throw t;
-					}
-				}
-			}
-		}
-	}
-
-	/**
-	 * Retries a test in case of a failure.
-	 */
-	private static class RetryOnExceptionStatement extends Statement {
-
-		private final Class<? extends Throwable> exceptionClass;
-		private final int timesOnFailure;
-		private final Statement statement;
-		
-		private int currentRun;
-
-		private RetryOnExceptionStatement(int timesOnFailure, Class<? extends Throwable> exceptionClass, Statement statement) {
-			checkArgument(timesOnFailure >= 0, "Negatives number of retries on failure");
-			this.exceptionClass = checkNotNull(exceptionClass);
-			this.timesOnFailure = timesOnFailure;
-			this.statement = statement;
-		}
-
-		/**
-		 * Retry a test in case of a failure with a specific exception
-		 *
-		 * @throws Throwable
-		 */
-		@Override
-		public void evaluate() throws Throwable {
-			for (currentRun = 0; currentRun <= timesOnFailure; currentRun++) {
-				try {
-					statement.evaluate();
-					break; // success
-				}
-				catch (Throwable t) {
-					LOG.warn(String.format("Test run failed (%d/%d).", currentRun, timesOnFailure + 1), t);
-
-					if (!exceptionClass.isAssignableFrom(t.getClass()) || currentRun >= timesOnFailure) {
-						// Throw the failure if retried too often, or if it is the wrong exception
-						throw t;
-					}
-				}
-			}
-		}
-	}
-}


[06/12] flink git commit: [hotfix] [core] Minor code cleanups in 'LocalFileSystem'.

Posted by se...@apache.org.
[hotfix] [core] Minor code cleanups in 'LocalFileSystem'.

This makes members final where possible and avoids repeated access to the system properties.
This commit also brings the formatting style closer to the style of the other Flink classes.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3feea13f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3feea13f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3feea13f

Branch: refs/heads/master
Commit: 3feea13f2990654184743eb0dfe16d3e80824972
Parents: 7ecf6c8
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Dec 13 18:47:16 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 14 12:43:32 2016 +0100

----------------------------------------------------------------------
 .../flink/core/fs/local/LocalBlockLocation.java | 12 +--
 .../flink/core/fs/local/LocalFileStatus.java    | 10 +--
 .../flink/core/fs/local/LocalFileSystem.java    | 86 ++++++++------------
 3 files changed, 37 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3feea13f/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java
index fa3de66..9825781 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.core.fs.local;
 
 import java.io.IOException;
@@ -25,9 +24,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.BlockLocation;
 
 /**
- * Implementation of the {@link BlockLocation} interface for a
- * local file system.
- * 
+ * Implementation of the {@link BlockLocation} interface for a local file system.
  */
 @Internal
 public class LocalBlockLocation implements BlockLocation {
@@ -41,30 +38,23 @@ public class LocalBlockLocation implements BlockLocation {
 		this.length = length;
 	}
 
-
 	@Override
 	public String[] getHosts() throws IOException {
-
 		return this.hosts;
 	}
 
-
 	@Override
 	public long getLength() {
-
 		return this.length;
 	}
 
-
 	@Override
 	public long getOffset() {
 		return 0;
 	}
 
-
 	@Override
 	public int compareTo(final BlockLocation o) {
 		return 0;
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3feea13f/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
index 3e127ff..63e999d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
@@ -29,7 +29,6 @@ import org.apache.flink.core.fs.Path;
 /**
  * The class <code>LocalFileStatus</code> provides an implementation of the {@link FileStatus} interface
  * for the local file system.
- * 
  */
 @Internal
 public class LocalFileStatus implements FileStatus {
@@ -57,48 +56,41 @@ public class LocalFileStatus implements FileStatus {
 		this.path = new Path(fs.getUri().getScheme() + ":" + f.toURI().getPath());
 	}
 
-
 	@Override
 	public long getAccessTime() {
 		return 0; // We don't have access files for local files
 	}
 
-
 	@Override
 	public long getBlockSize() {
 		return this.file.length();
 	}
 
-
 	@Override
 	public long getLen() {
 		return this.file.length();
 	}
 
-
 	@Override
 	public long getModificationTime() {
 		return this.file.lastModified();
 	}
 
-
 	@Override
 	public short getReplication() {
 		return 1; // For local files replication is always 1
 	}
 
-
 	@Override
 	public boolean isDir() {
 		return this.file.isDirectory();
 	}
 
-
 	@Override
 	public Path getPath() {
 		return this.path;
 	}
-	
+
 	public File getFile() {
 		return this.file;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3feea13f/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index 7ad68b3..acbf814 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -25,16 +25,7 @@
 
 package org.apache.flink.core.fs.local;
 
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.URI;
-import java.net.UnknownHostException;
-
 import org.apache.flink.annotation.Internal;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.core.fs.BlockLocation;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -43,63 +34,66 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.OperatingSystem;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+
 /**
- * The class <code>LocalFile</code> provides an implementation of the {@link FileSystem} interface for the local file
- * system.
- * 
+ * The class <code>LocalFile</code> provides an implementation of the {@link FileSystem} interface
+ * for the local file system of the machine where the JVM runs.
  */
 @Internal
 public class LocalFileSystem extends FileSystem {
 
-	/**
-	 * Path pointing to the current working directory.
-	 */
-	private Path workingDir = null;
+	private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystem.class);
 
-	/**
-	 * The URI representing the local file system.
-	 */
-	private final URI name = OperatingSystem.isWindows() ? URI.create("file:/") : URI.create("file:///");
+	/** The URI representing the local file system. */
+	private static final URI uri = OperatingSystem.isWindows() ? URI.create("file:/") : URI.create("file:///");
 
-	/**
-	 * The host name of this machine;
-	 */
-	private final String hostName;
+	/** Path pointing to the current working directory.
+	 * Because Paths are not immutable, we cannot cache the proper path here */
+	private final String workingDir;
 
-	private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystem.class);
+	/** Path pointing to the current working directory.
+	 * Because Paths are not immutable, we cannot cache the proper path here */
+	private final String homeDir;
+
+	/** The host name of this machine */
+	private final String hostName;
 
 	/**
 	 * Constructs a new <code>LocalFileSystem</code> object.
 	 */
 	public LocalFileSystem() {
-		this.workingDir = new Path(System.getProperty("user.dir")).makeQualified(this);
+		this.workingDir = new Path(System.getProperty("user.dir")).makeQualified(this).toString();
+		this.homeDir = new Path(System.getProperty("user.home")).toString();
 
 		String tmp = "unknownHost";
-
 		try {
 			tmp = InetAddress.getLocalHost().getHostName();
 		} catch (UnknownHostException e) {
 			LOG.error("Could not resolve local host", e);
 		}
-
 		this.hostName = tmp;
 	}
 
+	// ------------------------------------------------------------------------
 
 	@Override
-	public BlockLocation[] getFileBlockLocations(final FileStatus file, final long start, final long len)
-			throws IOException {
-
-		final BlockLocation[] blockLocations = new BlockLocation[1];
-		blockLocations[0] = new LocalBlockLocation(this.hostName, file.getLen());
-
-		return blockLocations;
+	public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
+		return new BlockLocation[] {
+				new LocalBlockLocation(hostName, file.getLen())
+		};
 	}
 
-
 	@Override
 	public FileStatus getFileStatus(Path f) throws IOException {
-
 		final File path = pathToFile(f);
 		if (path.exists()) {
 			return new LocalFileStatus(pathToFile(f), this);
@@ -110,40 +104,35 @@ public class LocalFileSystem extends FileSystem {
 		}
 	}
 
-
 	@Override
 	public URI getUri() {
-		return name;
+		return uri;
 	}
 
-
 	@Override
 	public Path getWorkingDirectory() {
-		return workingDir;
+		return new Path(workingDir);
 	}
 
 	@Override
 	public Path getHomeDirectory() {
-		return new Path(System.getProperty("user.home"));
+		return new Path(homeDir);
 	}
 
 	@Override
-	public void initialize(final URI name) throws IOException {	}
-
+	public void initialize(final URI name) throws IOException {}
 
 	@Override
 	public FSDataInputStream open(final Path f, final int bufferSize) throws IOException {
 		return open(f);
 	}
 
-
 	@Override
 	public FSDataInputStream open(final Path f) throws IOException {
 		final File file = pathToFile(f);
 		return new LocalDataInputStream(file);
 	}
 
-
 	private File pathToFile(Path path) {
 		if (!path.isAbsolute()) {
 			path = new Path(getWorkingDirectory(), path);
@@ -208,7 +197,6 @@ public class LocalFileSystem extends FileSystem {
 	private boolean delete(final File f) throws IOException {
 
 		if (f.isDirectory()) {
-
 			final File[] files = f.listFiles();
 			for (File file : files) {
 				final boolean del = delete(file);
@@ -216,7 +204,6 @@ public class LocalFileSystem extends FileSystem {
 					return false;
 				}
 			}
-
 		} else {
 			return f.delete();
 		}
@@ -234,7 +221,6 @@ public class LocalFileSystem extends FileSystem {
 	 *         thrown if an error occurred while creating the directory/directories
 	 */
 	public boolean mkdirs(final Path f) throws IOException {
-
 		final File p2f = pathToFile(f);
 
 		if(p2f.isDirectory()) {
@@ -266,14 +252,12 @@ public class LocalFileSystem extends FileSystem {
 
 	@Override
 	public FSDataOutputStream create(final Path f, final boolean overwrite) throws IOException {
-
 		return create(f, overwrite, 0, (short) 0, 0);
 	}
 
 
 	@Override
 	public boolean rename(final Path src, final Path dst) throws IOException {
-
 		final File srcFile = pathToFile(src);
 		final File dstFile = pathToFile(dst);
 


[12/12] flink git commit: [FLINK-5307] [metrics] Log reporter configuration

Posted by se...@apache.org.
[FLINK-5307] [metrics] Log reporter configuration

This closes #2979


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/22a82041
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/22a82041
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/22a82041

Branch: refs/heads/master
Commit: 22a8204138fdb73115d9501b34a8867a2c4608c6
Parents: a4ff480
Author: zentol <ch...@apache.org>
Authored: Fri Dec 9 13:25:11 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 14 12:43:33 2016 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/metrics/ganglia/GangliaReporter.java    | 2 ++
 .../java/org/apache/flink/metrics/graphite/GraphiteReporter.java  | 1 +
 .../src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java   | 1 +
 .../main/java/org/apache/flink/metrics/statsd/StatsDReporter.java | 3 +--
 .../java/org/apache/flink/runtime/metrics/MetricRegistry.java     | 1 +
 5 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/22a82041/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java b/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
index 15176a3..de9da74 100644
--- a/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
+++ b/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
@@ -71,6 +71,8 @@ public class GangliaReporter extends ScheduledDropwizardReporter {
 			builder.withDMax(dMax);
 			builder.withTMax(tMax);
 
+			log.info("Configured GangliaReporter with {host:{}, port:{}, dmax:{}, tmax:{}, ttl:{}, addressingMode:{}}",
+				host, port, dMax, tMax, ttl, addressingMode);			
 			return builder.build(gMetric);
 		} catch (IOException e) {
 			throw new RuntimeException("Error while instantiating GangliaReporter.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/22a82041/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
index 038bc3f..47a9d87 100644
--- a/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
+++ b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
@@ -75,6 +75,7 @@ public class GraphiteReporter extends ScheduledDropwizardReporter {
 			prot = Protocol.TCP;
 		}
 
+		log.info("Configured GraphiteReporter with {host:{}, port:{}, protocol:{}}", host, port, prot);
 		switch(prot) {
 			case UDP:
 				return builder.build(new GraphiteUDP(host, port));				

http://git-wip-us.apache.org/repos/asf/flink/blob/22a82041/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
index 7186d30..f0c0fcb 100644
--- a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
+++ b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
@@ -127,6 +127,7 @@ public class JMXReporter implements MetricReporter {
 				throw new RuntimeException("Could not start JMX server on any configured port. Ports: " + portsConfig);
 			}
 		}
+		LOG.info("Configured JMXReporter with {port:{}}", portsConfig);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/22a82041/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
index 977d1b4..42fe6a5 100644
--- a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
+++ b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
@@ -72,8 +72,6 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
 
 		this.address = new InetSocketAddress(host, port);
 
-		LOG.info("Starting StatsDReporter to send metric reports to " + address);
-
 //		String conversionRate = config.getString(ARG_CONVERSION_RATE, "SECONDS");
 //		String conversionDuration = config.getString(ARG_CONVERSION_DURATION, "MILLISECONDS");
 //		this.rateFactor = TimeUnit.valueOf(conversionRate).toSeconds(1);
@@ -84,6 +82,7 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
 		} catch (SocketException e) {
 			throw new RuntimeException("Could not create datagram socket. ", e);
 		}
+		log.info("Configured StatsDReporter with {host:{}, port:{}}", host, port);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/22a82041/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index f4510db..d3b21fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -116,6 +116,7 @@ public class MetricRegistry {
 
 					MetricConfig metricConfig = new MetricConfig();
 					reporterConfig.addAllToProperties(metricConfig);
+					LOG.info("Configuring {} with {}.", reporterClass.getSimpleName(), metricConfig);
 					reporterInstance.open(metricConfig);
 
 					if (reporterInstance instanceof Scheduled) {


[05/12] flink git commit: [FLINK-5327] Remove IOException from StateObject::getStateSize

Posted by se...@apache.org.
[FLINK-5327] Remove IOException from StateObject::getStateSize

This closes #2993


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3560f2e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3560f2e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3560f2e8

Branch: refs/heads/master
Commit: 3560f2e819425326cc93b4e019a615b7752b98e6
Parents: 47a6105
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon Dec 12 14:02:25 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 14 12:43:32 2016 +0100

----------------------------------------------------------------------
 .../flink/migration/state/MigrationStreamStateHandle.java       | 2 +-
 .../java/org/apache/flink/runtime/state/ChainedStateHandle.java | 3 +--
 .../org/apache/flink/runtime/state/KeyGroupsStateHandle.java    | 2 +-
 .../org/apache/flink/runtime/state/MultiStreamStateHandle.java  | 2 +-
 .../org/apache/flink/runtime/state/OperatorStateHandle.java     | 2 +-
 .../flink/runtime/state/RetrievableStreamStateHandle.java       | 2 +-
 .../main/java/org/apache/flink/runtime/state/StateObject.java   | 5 +----
 .../apache/flink/runtime/state/filesystem/FileStateHandle.java  | 3 +--
 .../checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java     | 3 +--
 .../streaming/runtime/tasks/InterruptSensitiveRestoreTest.java  | 2 +-
 10 files changed, 10 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3560f2e8/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
index e7aa788..e2da757 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
@@ -50,7 +50,7 @@ public class MigrationStreamStateHandle implements StreamStateHandle {
 	}
 
 	@Override
-	public long getStateSize() throws IOException {
+	public long getStateSize() {
 		return delegate.getStateSize();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3560f2e8/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
index a807428..27ef576 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
@@ -85,7 +84,7 @@ public class ChainedStateHandle<T extends StateObject> implements StateObject {
 	}
 
 	@Override
-	public long getStateSize() throws IOException {
+	public long getStateSize() {
 		long sumStateSize = 0;
 
 		if (operatorStateHandles != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3560f2e8/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index 1d277b3..b454e42 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -111,7 +111,7 @@ public class KeyGroupsStateHandle implements StreamStateHandle {
 	}
 
 	@Override
-	public long getStateSize() throws IOException {
+	public long getStateSize() {
 		return stateHandle.getStateSize();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3560f2e8/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
index 7492262..b95dace 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
@@ -58,7 +58,7 @@ public class MultiStreamStateHandle implements StreamStateHandle {
 	}
 
 	@Override
-	public long getStateSize() throws IOException {
+	public long getStateSize() {
 		return stateSize;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3560f2e8/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
index 1ad41ea..3cd37c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
@@ -55,7 +55,7 @@ public class OperatorStateHandle implements StreamStateHandle {
 	}
 
 	@Override
-	public long getStateSize() throws IOException {
+	public long getStateSize() {
 		return delegateStateHandle.getStateSize();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3560f2e8/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
index 29d21ac..653e227 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
@@ -70,7 +70,7 @@ public class RetrievableStreamStateHandle<T extends Serializable> implements
 	}
 
 	@Override
-	public long getStateSize() throws IOException {
+	public long getStateSize() {
 		return wrappedStreamStateHandle.getStateSize();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3560f2e8/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
index a502b9d..9ff2fa8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.state;
 
-import java.io.IOException;
-
 /**
  * Base of all types that represent checkpointed state. Specializations are for
  * example {@link StateHandle StateHandles} (directly resolve to state).
@@ -45,7 +43,6 @@ public interface StateObject extends java.io.Serializable {
 	 * <p>If the the size is not known, return {@code 0}.
 	 *
 	 * @return Size of the state in bytes.
-	 * @throws IOException If the operation fails during size retrieval.
 	 */
-	long getStateSize() throws IOException;
+	long getStateSize();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3560f2e8/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
index 4b2d350..bdf3f42 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
@@ -91,10 +91,9 @@ public class FileStateHandle implements StreamStateHandle {
 	 * Returns the file size in bytes.
 	 *
 	 * @return The file size in bytes.
-	 * @throws IOException Thrown if the file system cannot be accessed.
 	 */
 	@Override
-	public long getStateSize() throws IOException {
+	public long getStateSize() {
 		return stateSize;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3560f2e8/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index f46f7d2..046adba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -30,7 +30,6 @@ import org.junit.Test;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.concurrent.TimeUnit;
 
@@ -184,7 +183,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		}
 
 		@Override
-		public long getStateSize() throws IOException {
+		public long getStateSize() {
 			return 0;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3560f2e8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 6cde30f..1207cbb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -249,7 +249,7 @@ public class InterruptSensitiveRestoreTest {
 		public void discardState() throws Exception {}
 
 		@Override
-		public long getStateSize() throws IOException {
+		public long getStateSize() {
 			return 0;
 		}
 	}


[08/12] flink git commit: [hotfix] [core] Fix typo in variable name

Posted by se...@apache.org.
[hotfix] [core] Fix typo in variable name

This closes #3002


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4c767a3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4c767a3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4c767a3

Branch: refs/heads/master
Commit: e4c767a372fb0db840b8288a54d8712c660779f5
Parents: ef6b473
Author: Raghav <ra...@gmail.com>
Authored: Tue Dec 13 21:31:25 2016 -0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 14 12:43:33 2016 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/api/common/io/GlobFilePathFilter.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e4c767a3/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
index 4aaf481..a3a78ae 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
@@ -90,8 +90,8 @@ public class GlobFilePathFilter extends FilePathFilter {
 			return false;
 		}
 
-		for (PathMatcher mather : includeMatchers) {
-			if (mather.matches(Paths.get(filePath.getPath()))) {
+		for (PathMatcher matcher : includeMatchers) {
+			if (matcher.matches(Paths.get(filePath.getPath()))) {
 				return shouldExclude(filePath);
 			}
 		}


[09/12] flink git commit: [FLINK-5289] [streaming] Give meaningful exceptions when using value state on non-keyed stream

Posted by se...@apache.org.
[FLINK-5289] [streaming] Give meaningful exceptions when using value state on non-keyed stream

This closes #2969


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c906ad90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c906ad90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c906ad90

Branch: refs/heads/master
Commit: c906ad90ff6410c182995b77b7fd2eed32754989
Parents: 2f3ad58
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Dec 8 15:07:58 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 14 12:43:33 2016 +0100

----------------------------------------------------------------------
 .../api/operators/StreamingRuntimeContext.java  | 32 +++++++++++++++-----
 1 file changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c906ad90/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index fc9e39e..b450923 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -22,10 +22,12 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
+import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.runtime.execution.Environment;
@@ -33,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
 
 import java.util.List;
 import java.util.Map;
@@ -46,12 +49,12 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 
 	/** The operator to which this function belongs */
 	private final AbstractStreamOperator<?> operator;
-	
+
 	/** The task environment running the operator */
 	private final Environment taskEnvironment;
 
 	private final StreamConfig streamConfig;
-	
+
 	public StreamingRuntimeContext(AbstractStreamOperator<?> operator,
 									Environment env, Map<String, Accumulator<?, ?>> accumulators) {
 		super(env.getTaskInfo(),
@@ -60,17 +63,17 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 				accumulators,
 				env.getDistributedCacheEntries(),
 				operator.getMetricGroup());
-		
+
 		this.operator = operator;
 		this.taskEnvironment = env;
 		this.streamConfig = new StreamConfig(env.getTaskConfiguration());
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Returns the input split provider associated with the operator.
-	 * 
+	 *
 	 * @return The input split provider.
 	 */
 	public InputSplitProvider getInputSplitProvider() {
@@ -106,17 +109,30 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 
 	@Override
 	public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
-		return operator.getKeyedStateStore().getState(stateProperties);
+		KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
+		stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
+		return keyedStateStore.getState(stateProperties);
 	}
 
 	@Override
 	public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
-		return operator.getKeyedStateStore().getListState(stateProperties);
+		KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
+		stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
+		return keyedStateStore.getListState(stateProperties);
 	}
 
 	@Override
 	public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
-		return operator.getKeyedStateStore().getReducingState(stateProperties);
+		KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
+		stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
+		return keyedStateStore.getReducingState(stateProperties);
+	}
+
+	private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(StateDescriptor<?, ?> stateDescriptor) {
+		Preconditions.checkNotNull(stateDescriptor, "The state properties must not be null");
+		KeyedStateStore keyedStateStore = operator.getKeyedStateStore();
+		Preconditions.checkNotNull(keyedStateStore, "Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.");
+		return keyedStateStore;
 	}
 
 	// ------------------ expose (read only) relevant information from the stream config -------- //


[04/12] flink git commit: [hotfix] Clean up structure and comments in 'FileSystem'

Posted by se...@apache.org.
[hotfix] Clean up structure and comments in 'FileSystem'

This commit aims to improve the readability of the FileSystem class.
The commit does not introduce new/different code, but introduces sections in the class, moves nested
classes and methods between these sections.
The commit also improves comments for the class and nested classes.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ecf6c86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ecf6c86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ecf6c86

Branch: refs/heads/master
Commit: 7ecf6c86de2a782ba0aa34439c3387da82f42808
Parents: 3560f2e
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Dec 13 16:54:39 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 14 12:43:32 2016 +0100

----------------------------------------------------------------------
 flink-core/pom.xml                              |   1 +
 .../org/apache/flink/core/fs/FileSystem.java    | 194 ++++++++++---------
 2 files changed, 100 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ecf6c86/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index ffbfe70..396ef86 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -125,6 +125,7 @@ under the License.
 					<parameter>
 						<excludes combine.children="append">
 							<exclude>org.apache.flink.api.common.ExecutionConfig#CONFIG_KEY</exclude>
+							<exclude>org.apache.flink.core.fs.FileSystem$FSKey</exclude>
 							<exclude>org.apache.flink.api.java.typeutils.WritableTypeInfo</exclude>
 						</excludes>
 					</parameter>

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecf6c86/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 433cec0..3dced6f 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -28,11 +28,14 @@ package org.apache.flink.core.fs;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.OperatingSystem;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -42,17 +45,38 @@ import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * An abstract base class for a fairly generic file system. It
- * may be implemented as a distributed file system, or as a local
- * one that reflects the locally-connected disk.
+ * Abstract base class of all file systems used by Flink. This class may be extended to implement
+ * distributed file systems, or local file systems. The abstraction by this file system is very simple,
+ * and teh set of allowed operations quite limited, to support the common denominator of a wide
+ * range of file systems. For example, appending to or mutating existing files is not supported.
+ * 
+ * <p>Flink implements and supports some file system types directly (for example the default
+ * machine-local file system). Other file system types are accessed by an implementation that bridges
+ * to the suite of file systems supported by Hadoop (such as for example HDFS).
  */
 @Public
 public abstract class FileSystem {
 
-	private static final InheritableThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new InheritableThreadLocal<>();
+	/**
+	 * The possible write modes. The write mode decides what happens if a file should be created,
+	 * but already exists.
+	 */
+	public enum WriteMode {
+
+		/** Creates the target file if it does not exist. Does not overwrite existing files and directories. */
+		NO_OVERWRITE,
+
+		/** Creates a new target file regardless of any existing files or directories. Existing files and
+		 * directories will be removed/overwritten. */
+		OVERWRITE
+	}
 
-	private static final String LOCAL_FILESYSTEM_CLASS = "org.apache.flink.core.fs.local.LocalFileSystem";
+	// ------------------------------------------------------------------------
+
+	private static final InheritableThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new InheritableThreadLocal<>();
 
 	private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFileSystem";
 
@@ -98,89 +122,6 @@ public abstract class FileSystem {
 	private static final Object SYNCHRONIZATION_OBJECT = new Object();
 
 	/**
-	 * Enumeration for write modes.
-	 *
-	 */
-	public enum WriteMode {
-
-		/** Creates write path if it does not exist. Does not overwrite existing files and directories. */
-		NO_OVERWRITE,
-
-		/** Creates write path if it does not exist. Overwrites existing files and directories. */
-		OVERWRITE
-	}
-
-	/**
-	 * An auxiliary class to identify a file system by its scheme and its authority.
-	 */
-	public static class FSKey {
-
-		/**
-		 * The scheme of the file system.
-		 */
-		private String scheme;
-
-		/**
-		 * The authority of the file system.
-		 */
-		private String authority;
-
-		/**
-		 * Creates a file system key from a given scheme and an
-		 * authority.
-		 *
-		 * @param scheme
-		 *        the scheme of the file system
-		 * @param authority
-		 *        the authority of the file system
-		 */
-		public FSKey(final String scheme, final String authority) {
-			this.scheme = scheme;
-			this.authority = authority;
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public boolean equals(final Object obj) {
-			if (obj == this) {
-				return true;
-			}
-
-			if (obj instanceof FSKey) {
-				final FSKey key = (FSKey) obj;
-
-				if (!this.scheme.equals(key.scheme)) {
-					return false;
-				}
-
-				if ((this.authority == null) || (key.authority == null)) {
-					return this.authority == null && key.authority == null;
-				}
-				return this.authority.equals(key.authority);
-			}
-			return false;
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public int hashCode() {
-			if (this.scheme != null) {
-				return this.scheme.hashCode();
-			}
-
-			if (this.authority != null) {
-				return this.authority.hashCode();
-			}
-
-			return super.hashCode();
-		}
-	}
-
-	/**
 	 * Data structure mapping file system keys (scheme + authority) to cached file system objects.
 	 */
 	private static final Map<FSKey, FileSystem> CACHE = new HashMap<FSKey, FileSystem>();
@@ -193,7 +134,7 @@ public abstract class FileSystem {
 	static {
 		FSDIRECTORY.put("hdfs", HADOOP_WRAPPER_FILESYSTEM_CLASS);
 		FSDIRECTORY.put("maprfs", MAPR_FILESYSTEM_CLASS);
-		FSDIRECTORY.put("file", LOCAL_FILESYSTEM_CLASS);
+		FSDIRECTORY.put("file", LocalFileSystem.class.getName());
 	}
 
 	/**
@@ -405,6 +346,11 @@ public abstract class FileSystem {
 		return hadoopWrapper.getHadoopWrapperClassNameForFileSystem(scheme);
 	}
 
+
+	// ------------------------------------------------------------------------
+	//  File System Methods
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Returns the path of the file system's current working directory.
 	 *
@@ -577,6 +523,16 @@ public abstract class FileSystem {
 	 */
 	public abstract boolean rename(Path src, Path dst) throws IOException;
 
+	/**
+	 * Returns true if this is a distributed file system, false otherwise.
+	 *
+	 * @return True, if this is a distributed file system, false otherwise.
+	 */
+	public abstract boolean isDistributedFS();
+
+	// ------------------------------------------------------------------------
+	//  output directory initialization
+	// ------------------------------------------------------------------------
 
 	/**
 	 * Initializes output directories on local file systems according to the given write mode.
@@ -814,15 +770,63 @@ public abstract class FileSystem {
 
 	}
 
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private static Class<? extends FileSystem> getFileSystemByName(String className) throws ClassNotFoundException {
+		return Class.forName(className, true, FileSystem.class.getClassLoader()).asSubclass(FileSystem.class);
+	}
+
 	/**
-	 * Returns true if this is a distributed file system, false otherwise.
-	 *
-	 * @return True if this is a distributed file system, false otherwise.
+	 * An identifier of a file system, via its scheme and its authority.
+	 * This class needs to stay public, because it is detected as part of the public API.
 	 */
-	public abstract boolean isDistributedFS();
+	public static class FSKey {
 
+		/** The scheme of the file system. */
+		private final String scheme;
 
-	private static Class<? extends FileSystem> getFileSystemByName(String className) throws ClassNotFoundException {
-		return Class.forName(className, true, FileSystem.class.getClassLoader()).asSubclass(FileSystem.class);
+		/** The authority of the file system. */
+		@Nullable
+		private final String authority;
+
+		/**
+		 * Creates a file system key from a given scheme and an authority.
+		 *
+		 * @param scheme     The scheme of the file system
+		 * @param authority  The authority of the file system
+		 */
+		public FSKey(String scheme, @Nullable String authority) {
+			this.scheme = checkNotNull(scheme, "scheme");
+			this.authority = authority;
+		}
+
+		@Override
+		public boolean equals(final Object obj) {
+			if (obj == this) {
+				return true;
+			}
+			else if (obj != null && obj.getClass() == FSKey.class) {
+				final FSKey that = (FSKey) obj;
+				return this.scheme.equals(that.scheme) &&
+						(this.authority == null ? that.authority == null :
+								(that.authority != null && this.authority.equals(that.authority)));
+			}
+			else {
+				return false;
+			}
+		}
+
+		@Override
+		public int hashCode() {
+			return 31 * scheme.hashCode() + 
+					(authority == null ? 17 : authority.hashCode());
+		}
+
+		@Override
+		public String toString() {
+			return scheme + "://" + authority;
+		}
 	}
 }


[10/12] flink git commit: [FLINK-5330] [tests] Harden KafkaConsumer08Test to fail reliably with unknown host exception

Posted by se...@apache.org.
[FLINK-5330] [tests] Harden KafkaConsumer08Test to fail reliably with unknown host exception

Using static mocking to reliably fail the InetAddress.getByName call with an UnknowHostException.
Furthermore, the PR decreases the connection timeouts which speeds the test execution up.

This closes #2998


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef6b473a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef6b473a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef6b473a

Branch: refs/heads/master
Commit: ef6b473aa6705368a50930bd4205f7cc2e0f6584
Parents: a5cf88f
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Dec 13 15:48:06 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 14 12:43:33 2016 +0100

----------------------------------------------------------------------
 .../connectors/kafka/KafkaConsumer08Test.java   | 38 +++++++++++++++++---
 1 file changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ef6b473a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
index 9520f55..5ae74d7 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
@@ -21,15 +21,29 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.powermock.api.mockito.PowerMockito.when;
 
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
 import java.util.Collections;
 import java.util.Properties;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.NetUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FlinkKafkaConsumer08.class)
+@PowerMockIgnore("javax.management.*")
 public class KafkaConsumer08Test {
 
 	@Test
@@ -80,6 +94,7 @@ public class KafkaConsumer08Test {
 			props.setProperty("zookeeper.connect", "localhost:56794");
 			props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222");
 			props.setProperty("group.id", "non-existent-group");
+			props.setProperty(FlinkKafkaConsumer08.GET_PARTITIONS_RETRIES_KEY, "1");
 
 			FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), new SimpleStringSchema(), props);
 			consumer.open(new Configuration());
@@ -93,10 +108,16 @@ public class KafkaConsumer08Test {
 	@Test
 	public void testAllBoostrapServerHostsAreInvalid() {
 		try {
+			String unknownHost = "foobar:11111";
+
+			URL unknownHostURL = NetUtils.getCorrectHostnamePort(unknownHost);
+
+			PowerMockito.mockStatic(InetAddress.class);
+			when(InetAddress.getByName(Matchers.eq(unknownHostURL.getHost()))).thenThrow(new UnknownHostException("Test exception"));
+
 			String zookeeperConnect = "localhost:56794";
-			String bootstrapServers = "indexistentHost:11111";
 			String groupId = "non-existent-group";
-			Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId);
+			Properties props = createKafkaProps(zookeeperConnect, unknownHost, groupId);
 			FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
 					new SimpleStringSchema(), props);
 			consumer.open(new Configuration());
@@ -112,9 +133,16 @@ public class KafkaConsumer08Test {
 	public void testAtLeastOneBootstrapServerHostIsValid() {
 		try {
 			String zookeeperConnect = "localhost:56794";
-			// we declare one valid boostrap server, namely the one with
+			String unknownHost = "foobar:11111";
+			// we declare one valid bootstrap server, namely the one with
 			// 'localhost'
-			String bootstrapServers = "indexistentHost:11111, localhost:22222";
+			String bootstrapServers = unknownHost + ", localhost:22222";
+
+			URL unknownHostURL = NetUtils.getCorrectHostnamePort(unknownHost);
+
+			PowerMockito.mockStatic(InetAddress.class);
+			when(InetAddress.getByName(Matchers.eq(unknownHostURL.getHost()))).thenThrow(new UnknownHostException("Test exception"));
+
 			String groupId = "non-existent-group";
 			Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId);
 			FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
@@ -134,6 +162,8 @@ public class KafkaConsumer08Test {
 		props.setProperty("zookeeper.connect", zookeeperConnect);
 		props.setProperty("bootstrap.servers", bootstrapServers);
 		props.setProperty("group.id", groupId);
+		props.setProperty("socket.timeout.ms", "100");
+		props.setProperty(FlinkKafkaConsumer08.GET_PARTITIONS_RETRIES_KEY, "1");
 		return props;
 	}
 }


[07/12] flink git commit: [FLINK-5002] [network] Renamed getNumberOfUsedBuffers() method to bestEffortGetNumOfUsedBuffers()

Posted by se...@apache.org.
[FLINK-5002] [network] Renamed getNumberOfUsedBuffers() method to bestEffortGetNumOfUsedBuffers()

This closes #2865


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a5cf88f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a5cf88f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a5cf88f7

Branch: refs/heads/master
Commit: a5cf88f7ff1fcd46bada0cf12c517c1667b21751
Parents: 22a8204
Author: Roman Maier <ro...@epam.com>
Authored: Fri Nov 18 17:51:58 2016 +0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 14 12:43:33 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/runtime/io/network/buffer/BufferPool.java   | 2 +-
 .../apache/flink/runtime/io/network/buffer/LocalBufferPool.java  | 4 ++--
 .../apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java   | 4 ++--
 3 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a5cf88f7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
index ae21a84..8784b14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
@@ -71,6 +71,6 @@ public interface BufferPool extends BufferProvider, BufferRecycler {
 	/**
 	 * Returns the number of used buffers of this buffer pool.
 	 */
-	int getNumberOfUsedBuffers();
+	int bestEffortGetNumOfUsedBuffers();
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a5cf88f7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 68c1bde..86e6870 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -111,8 +111,8 @@ class LocalBufferPool implements BufferPool {
 	}
 
 	@Override
-	public int getNumberOfUsedBuffers() {
-		return numberOfRequestedMemorySegments - availableMemorySegments.size();
+	public int bestEffortGetNumOfUsedBuffers() {
+		return Math.max(0, numberOfRequestedMemorySegments - availableMemorySegments.size());
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a5cf88f7/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index fcea098..38accad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -183,7 +183,7 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
 			int bufferPoolSize = 0;
 
 			for (SingleInputGate inputGate : task.getAllInputGates()) {
-				usedBuffers += inputGate.getBufferPool().getNumberOfUsedBuffers();
+				usedBuffers += inputGate.getBufferPool().bestEffortGetNumOfUsedBuffers();
 				bufferPoolSize += inputGate.getBufferPool().getNumBuffers();
 			}
 
@@ -212,7 +212,7 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
 			int bufferPoolSize = 0;
 
 			for (ResultPartition resultPartition : task.getProducedPartitions()) {
-				usedBuffers += resultPartition.getBufferPool().getNumberOfUsedBuffers();
+				usedBuffers += resultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers();
 				bufferPoolSize += resultPartition.getBufferPool().getNumBuffers();
 			}