You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/01/08 19:22:07 UTC

[1/9] flink git commit: [FLINK-8280][checkstyle] enable and fix checkstyle in BlobServer and BlobUtils

Repository: flink
Updated Branches:
  refs/heads/master 2558ae511 -> f16335d42


[FLINK-8280][checkstyle] enable and fix checkstyle in BlobServer and BlobUtils

This closes #5175.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50301254
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50301254
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50301254

Branch: refs/heads/master
Commit: 50301254182283433d52b5359b1afa6093d0514b
Parents: 3cdc5d1
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Dec 18 13:22:13 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Jan 8 17:29:51 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/blob/BlobClient.java   |  2 +-
 .../flink/runtime/blob/BlobInputStream.java     |  6 +--
 .../org/apache/flink/runtime/blob/BlobKey.java  | 12 ++---
 .../apache/flink/runtime/blob/BlobServer.java   | 46 ++++++++++----------
 .../flink/runtime/blob/BlobServerProtocol.java  |  5 +++
 .../apache/flink/runtime/blob/BlobUtils.java    | 12 ++---
 .../flink/runtime/blob/FileSystemBlobStore.java |  8 ++--
 .../flink/runtime/blob/BlobCacheGetTest.java    |  2 +-
 .../flink/runtime/blob/BlobClientSslTest.java   | 40 ++++++++---------
 .../flink/runtime/blob/BlobClientTest.java      | 12 ++---
 .../runtime/blob/TestingFailingBlobServer.java  |  2 +-
 tools/maven/suppressions-runtime.xml            |  7 ---
 12 files changed, 76 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/50301254/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index fbcce58..8e6b328 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -119,7 +119,7 @@ public final class BlobClient implements Closeable {
 			}
 
 		}
-		catch(Exception e) {
+		catch (Exception e) {
 			BlobUtils.closeSilently(socket, LOG);
 			throw new IOException("Could not connect to BlobServer at address " + serverAddress, e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/50301254/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
index 7a73917..ad318bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
@@ -50,7 +50,7 @@ final class BlobInputStream extends InputStream {
 	private final OutputStream wrappedOutputStream;
 
 	/**
-	 * The BLOB key if the GET operation has been performed on a content-addressable BLOB, otherwise <code>null<code>.
+	 * The BLOB key if the GET operation has been performed on a content-addressable BLOB, otherwise <code>null</code>.
 	 */
 	private final BlobKey blobKey;
 
@@ -72,7 +72,7 @@ final class BlobInputStream extends InputStream {
 
 	/**
 	 * Constructs a new BLOB input stream.
-	 * 
+	 *
 	 * @param wrappedInputStream
 	 *        the underlying input stream to read from
 	 * @param blobKey
@@ -98,7 +98,7 @@ final class BlobInputStream extends InputStream {
 
 	/**
 	 * Convenience method to throw an {@link EOFException}.
-	 * 
+	 *
 	 * @throws EOFException
 	 *         thrown to indicate the underlying input stream did not provide as much data as expected
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/50301254/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
index 4b1d498..988af8b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
@@ -199,14 +199,14 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
 
 	/**
 	 * Adds the BLOB key to the given {@link MessageDigest}.
-	 * 
+	 *
 	 * @param md
 	 *        the message digest to add the BLOB key to
 	 */
 	public void addToMessageDigest(MessageDigest md) {
 		md.update(this.key);
 	}
-	
+
 	@Override
 	public boolean equals(final Object obj) {
 
@@ -252,7 +252,7 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
 		final byte[] aarr = this.key;
 		final byte[] barr = o.key;
 		final int len = Math.min(aarr.length, barr.length);
-	
+
 		for (int i = 0; i < len; ++i) {
 			final int a = (aarr[i] & 0xff);
 			final int b = (barr[i] & 0xff);
@@ -274,12 +274,12 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
 			return aarr.length - barr.length;
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
 	 * Auxiliary method to read a BLOB key from an input stream.
-	 * 
+	 *
 	 * @param inputStream
 	 *        the input stream to read the BLOB key from
 	 * @return the read BLOB key
@@ -331,7 +331,7 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
 
 	/**
 	 * Auxiliary method to write this BLOB key to an output stream.
-	 * 
+	 *
 	 * @param outputStream
 	 *        the output stream to write the BLOB key to
 	 * @throws IOException

http://git-wip-us.apache.org/repos/asf/flink/blob/50301254/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 01fb808..a3522bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -77,28 +77,28 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
 	/** The server socket listening for incoming connections. */
 	private final ServerSocket serverSocket;
 
-	/** The SSL server context if ssl is enabled for the connections */
+	/** The SSL server context if ssl is enabled for the connections. */
 	private final SSLContext serverSSLContext;
 
-	/** Blob Server configuration */
+	/** Blob Server configuration. */
 	private final Configuration blobServiceConfiguration;
 
 	/** Indicates whether a shutdown of server component has been requested. */
 	private final AtomicBoolean shutdownRequested = new AtomicBoolean();
 
-	/** Root directory for local file storage */
+	/** Root directory for local file storage. */
 	private final File storageDir;
 
-	/** Blob store for distributed file storage, e.g. in HA */
+	/** Blob store for distributed file storage, e.g. in HA. */
 	private final BlobStore blobStore;
 
-	/** Set of currently running threads */
+	/** Set of currently running threads. */
 	private final Set<BlobServerConnection> activeConnections = new HashSet<>();
 
-	/** The maximum number of concurrent connections */
+	/** The maximum number of concurrent connections. */
 	private final int maxConnections;
 
-	/** Lock guarding concurrent file accesses */
+	/** Lock guarding concurrent file accesses. */
 	private final ReadWriteLock readWriteLock;
 
 	/**
@@ -201,8 +201,8 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
 			}
 		});
 
-		if(socketAttempt == null) {
-			throw new IOException("Unable to allocate socket for blob server in specified port range: "+serverPortRange);
+		if (socketAttempt == null) {
+			throw new IOException("Unable to allocate socket for blob server in specified port range: " + serverPortRange);
 		} else {
 			SSLUtils.setSSLVerAndCipherSuites(socketAttempt, config);
 			this.serverSocket = socketAttempt;
@@ -254,7 +254,7 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
 	}
 
 	/**
-	 * Returns the lock used to guard file accesses
+	 * Returns the lock used to guard file accesses.
 	 */
 	ReadWriteLock getReadWriteLock() {
 		return readWriteLock;
@@ -360,7 +360,7 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
 				}
 			}
 
-			if(LOG.isInfoEnabled()) {
+			if (LOG.isInfoEnabled()) {
 				LOG.info("Stopped BLOB server at {}:{}", serverSocket.getInetAddress().getHostAddress(), getPort());
 			}
 
@@ -375,8 +375,8 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
 
 	/**
 	 * Retrieves the local path of a (job-unrelated) file associated with a job and a blob key.
-	 * <p>
-	 * The blob server looks the blob key up in its local storage. If the file exists, it is
+	 *
+	 * <p>The blob server looks the blob key up in its local storage. If the file exists, it is
 	 * returned. If the file does not exist, it is retrieved from the HA blob store (if available)
 	 * or a {@link FileNotFoundException} is thrown.
 	 *
@@ -395,8 +395,8 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
 
 	/**
 	 * Retrieves the local path of a file associated with a job and a blob key.
-	 * <p>
-	 * The blob server looks the blob key up in its local storage. If the file exists, it is
+	 *
+	 * <p>The blob server looks the blob key up in its local storage. If the file exists, it is
 	 * returned. If the file does not exist, it is retrieved from the HA blob store (if available)
 	 * or a {@link FileNotFoundException} is thrown.
 	 *
@@ -419,8 +419,8 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
 	/**
 	 * Returns the path to a local copy of the file associated with the provided job ID and blob
 	 * key.
-	 * <p>
-	 * We will first attempt to serve the BLOB from the local storage. If the BLOB is not in
+	 *
+	 * <p>We will first attempt to serve the BLOB from the local storage. If the BLOB is not in
 	 * there, we will try to download it from the HA store.
 	 *
 	 * @param jobId
@@ -443,8 +443,8 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
 
 	/**
 	 * Retrieves the local path of a file associated with a job and a blob key.
-	 * <p>
-	 * The blob server looks the blob key up in its local storage. If the file exists, it is
+	 *
+	 * <p>The blob server looks the blob key up in its local storage. If the file exists, it is
 	 * returned. If the file does not exist, it is retrieved from the HA blob store (if available)
 	 * or a {@link FileNotFoundException} is thrown.
 	 *
@@ -474,12 +474,12 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
 
 	/**
 	 * Helper to retrieve the local path of a file associated with a job and a blob key.
-	 * <p>
-	 * The blob server looks the blob key up in its local storage. If the file exists, it is
+	 *
+	 * <p>The blob server looks the blob key up in its local storage. If the file exists, it is
 	 * returned. If the file does not exist, it is retrieved from the HA blob store (if available)
 	 * or a {@link FileNotFoundException} is thrown.
-	 * <p>
-	 * <strong>Assumes the read lock has already been acquired.</strong>
+	 *
+	 * <p><strong>Assumes the read lock has already been acquired.</strong>
 	 *
 	 * @param jobId
 	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)

http://git-wip-us.apache.org/repos/asf/flink/blob/50301254/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
index 5c9c7b0..d158428 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
@@ -15,8 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.blob;
 
+/**
+ * Defines constants for the protocol between the BLOB {@link BlobServer server} and the
+ * {@link AbstractBlobCache caches}.
+ */
 public class BlobServerProtocol {
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/50301254/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 04f2cdb..28430ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -150,8 +150,8 @@ public class BlobUtils {
 		File storageDir;
 
 		// NOTE: although we will be using UUIDs, there may be collisions
-		final int MAX_ATTEMPTS = 10;
-		for(int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
+		int maxAttempts = 10;
+		for (int attempt = 0; attempt < maxAttempts; attempt++) {
 			storageDir = new File(baseDir, String.format(
 					"blobStore-%s", UUID.randomUUID().toString()));
 
@@ -251,8 +251,8 @@ public class BlobUtils {
 
 	/**
 	 * Returns the path for the given blob key.
-	 * <p>
-	 * The returned path can be used with the (local or HA) BLOB store file system back-end for
+	 *
+	 * <p>The returned path can be used with the (local or HA) BLOB store file system back-end for
 	 * recovery purposes and follows the same scheme as {@link #getStorageLocation(File, JobID,
 	 * BlobKey)}.
 	 *
@@ -403,12 +403,12 @@ public class BlobUtils {
 		}
 	}
 
-	static void closeSilently(Socket socket, Logger LOG) {
+	static void closeSilently(Socket socket, Logger log) {
 		if (socket != null) {
 			try {
 				socket.close();
 			} catch (Throwable t) {
-				LOG.debug("Exception while closing BLOB server connection socket.", t);
+				log.debug("Exception while closing BLOB server connection socket.", t);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/50301254/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index 4fed4cd..4a07a60 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -46,10 +46,10 @@ public class FileSystemBlobStore implements BlobStoreService {
 
 	private static final Logger LOG = LoggerFactory.getLogger(FileSystemBlobStore.class);
 
-	/** The file system in which blobs are stored */
+	/** The file system in which blobs are stored. */
 	private final FileSystem fileSystem;
-	
-	/** The base path of the blob store */
+
+	/** The base path of the blob store. */
 	private final String basePath;
 
 	public FileSystemBlobStore(FileSystem fileSystem, String storagePath) throws IOException {
@@ -148,7 +148,7 @@ public class FileSystemBlobStore implements BlobStoreService {
 	private boolean delete(String blobPath) {
 		try {
 			LOG.debug("Deleting {}.", blobPath);
-			
+
 			Path path = new Path(blobPath);
 
 			boolean result = fileSystem.delete(path, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/50301254/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
index c760d04..6e05aa0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
@@ -78,7 +78,7 @@ import static org.mockito.Mockito.mock;
 /**
  * Tests for GET-specific parts of the {@link BlobCacheService}.
  *
- * This includes access of transient BLOBs from the {@link PermanentBlobCache}, permanent BLOBS from
+ * <p>This includes access of transient BLOBs from the {@link PermanentBlobCache}, permanent BLOBS from
  * the {@link TransientBlobCache}, and how failing GET requests behave in the presence of failures
  * when used with a {@link BlobCacheService}.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/50301254/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
index 664dc28..b654cee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
@@ -36,10 +36,10 @@ import java.io.IOException;
 public class BlobClientSslTest extends BlobClientTest {
 
 	/** The instance of the SSL BLOB server used during the tests. */
-	private static BlobServer BLOB_SSL_SERVER;
+	private static BlobServer blobSslServer;
 
 	/** Instance of a non-SSL BLOB server with SSL-enabled security options. */
-	private static BlobServer BLOB_NON_SSL_SERVER;
+	private static BlobServer blobNonSslServer;
 
 	/** The SSL blob service client configuration. */
 	private static Configuration sslClientConfig;
@@ -62,8 +62,8 @@ public class BlobClientSslTest extends BlobClientTest {
 		config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore");
 		config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
 		config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
-		BLOB_SSL_SERVER = new BlobServer(config, new VoidBlobStore());
-		BLOB_SSL_SERVER.start();
+		blobSslServer = new BlobServer(config, new VoidBlobStore());
+		blobSslServer.start();
 
 		sslClientConfig = new Configuration();
 		sslClientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
@@ -81,8 +81,8 @@ public class BlobClientSslTest extends BlobClientTest {
 		config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore");
 		config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
 		config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
-		BLOB_NON_SSL_SERVER = new BlobServer(config, new VoidBlobStore());
-		BLOB_NON_SSL_SERVER.start();
+		blobNonSslServer = new BlobServer(config, new VoidBlobStore());
+		blobNonSslServer.start();
 
 		nonSslClientConfig = new Configuration();
 		nonSslClientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
@@ -96,11 +96,11 @@ public class BlobClientSslTest extends BlobClientTest {
 	 */
 	@AfterClass
 	public static void stopServers() throws IOException {
-		if (BLOB_SSL_SERVER != null) {
-			BLOB_SSL_SERVER.close();
+		if (blobSslServer != null) {
+			blobSslServer.close();
 		}
-		if (BLOB_NON_SSL_SERVER != null) {
-			BLOB_NON_SSL_SERVER.close();
+		if (blobNonSslServer != null) {
+			blobNonSslServer.close();
 		}
 	}
 
@@ -109,7 +109,7 @@ public class BlobClientSslTest extends BlobClientTest {
 	}
 
 	protected BlobServer getBlobServer() {
-		return BLOB_SSL_SERVER;
+		return blobSslServer;
 	}
 
 	/**
@@ -117,7 +117,7 @@ public class BlobClientSslTest extends BlobClientTest {
 	 */
 	@Test
 	public void testUploadJarFilesHelper() throws Exception {
-		uploadJarFile(BLOB_SSL_SERVER, sslClientConfig);
+		uploadJarFile(blobSslServer, sslClientConfig);
 	}
 
 	/**
@@ -126,7 +126,7 @@ public class BlobClientSslTest extends BlobClientTest {
 	@Test(expected = IOException.class)
 	public void testSSLClientFailure() throws Exception {
 		// SSL client connected to non-ssl server
-		uploadJarFile(BLOB_SERVER, sslClientConfig);
+		uploadJarFile(blobServer, sslClientConfig);
 	}
 
 	/**
@@ -135,7 +135,7 @@ public class BlobClientSslTest extends BlobClientTest {
 	@Test(expected = IOException.class)
 	public void testSSLClientFailure2() throws Exception {
 		// SSL client connected to non-ssl server
-		uploadJarFile(BLOB_NON_SSL_SERVER, sslClientConfig);
+		uploadJarFile(blobNonSslServer, sslClientConfig);
 	}
 
 	/**
@@ -144,7 +144,7 @@ public class BlobClientSslTest extends BlobClientTest {
 	@Test(expected = IOException.class)
 	public void testSSLServerFailure() throws Exception {
 		// Non-SSL client connected to ssl server
-		uploadJarFile(BLOB_SSL_SERVER, clientConfig);
+		uploadJarFile(blobSslServer, clientConfig);
 	}
 
 	/**
@@ -153,7 +153,7 @@ public class BlobClientSslTest extends BlobClientTest {
 	@Test(expected = IOException.class)
 	public void testSSLServerFailure2() throws Exception {
 		// Non-SSL client connected to ssl server
-		uploadJarFile(BLOB_SSL_SERVER, nonSslClientConfig);
+		uploadJarFile(blobSslServer, nonSslClientConfig);
 	}
 
 	/**
@@ -161,7 +161,7 @@ public class BlobClientSslTest extends BlobClientTest {
 	 */
 	@Test
 	public void testNonSSLConnection() throws Exception {
-		uploadJarFile(BLOB_SERVER, clientConfig);
+		uploadJarFile(blobServer, clientConfig);
 	}
 
 	/**
@@ -169,7 +169,7 @@ public class BlobClientSslTest extends BlobClientTest {
 	 */
 	@Test
 	public void testNonSSLConnection2() throws Exception {
-		uploadJarFile(BLOB_SERVER, nonSslClientConfig);
+		uploadJarFile(blobServer, nonSslClientConfig);
 	}
 
 	/**
@@ -177,7 +177,7 @@ public class BlobClientSslTest extends BlobClientTest {
 	 */
 	@Test
 	public void testNonSSLConnection3() throws Exception {
-		uploadJarFile(BLOB_NON_SSL_SERVER, clientConfig);
+		uploadJarFile(blobNonSslServer, clientConfig);
 	}
 
 	/**
@@ -185,6 +185,6 @@ public class BlobClientSslTest extends BlobClientTest {
 	 */
 	@Test
 	public void testNonSSLConnection4() throws Exception {
-		uploadJarFile(BLOB_NON_SSL_SERVER, nonSslClientConfig);
+		uploadJarFile(blobNonSslServer, nonSslClientConfig);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/50301254/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index 9e4f4b7..8c2b7c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -62,7 +62,7 @@ public class BlobClientTest extends TestLogger {
 	private static final int TEST_BUFFER_SIZE = 17 * 1000;
 
 	/** The instance of the (non-ssl) BLOB server used during the tests. */
-	static BlobServer BLOB_SERVER;
+	static BlobServer blobServer;
 
 	/** The blob service (non-ssl) client configuration. */
 	static Configuration clientConfig;
@@ -79,8 +79,8 @@ public class BlobClientTest extends TestLogger {
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
 
-		BLOB_SERVER = new BlobServer(config, new VoidBlobStore());
-		BLOB_SERVER.start();
+		blobServer = new BlobServer(config, new VoidBlobStore());
+		blobServer.start();
 
 		clientConfig = new Configuration();
 	}
@@ -90,8 +90,8 @@ public class BlobClientTest extends TestLogger {
 	 */
 	@AfterClass
 	public static void stopServer() throws IOException {
-		if (BLOB_SERVER != null) {
-			BLOB_SERVER.close();
+		if (blobServer != null) {
+			blobServer.close();
 		}
 	}
 
@@ -319,7 +319,7 @@ public class BlobClientTest extends TestLogger {
 	}
 
 	protected BlobServer getBlobServer() {
-		return BLOB_SERVER;
+		return blobServer;
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/50301254/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
index 003eaf6..57c6366 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
@@ -78,7 +78,7 @@ public class TestingFailingBlobServer extends BlobServer {
 				if (socket != null) {
 					try {
 						socket.close();
-					} catch(Throwable ignored) {}
+					} catch (Throwable ignored) {}
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/50301254/tools/maven/suppressions-runtime.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions-runtime.xml b/tools/maven/suppressions-runtime.xml
index 59f3019..8f0162d 100644
--- a/tools/maven/suppressions-runtime.xml
+++ b/tools/maven/suppressions-runtime.xml
@@ -24,13 +24,6 @@ under the License.
 
 <suppressions>
 	<suppress
-		files="(.*)runtime[/\\]blob[/\\](.*)"
-		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-	<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
-	<suppress
-		files="(.*)test[/\\](.*)runtime[/\\]blob[/\\](.*)"
-		checks="AvoidStarImport|UnusedImport"/>
-	<suppress
 		files="(.*)runtime[/\\]checkpoint[/\\](.*)"
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
 	<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->


[3/9] flink git commit: [FLINK-8383][mesos] Disable test-jar shading

Posted by ch...@apache.org.
[FLINK-8383][mesos] Disable test-jar shading

This closes #5258.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c2a32c2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c2a32c2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c2a32c2

Branch: refs/heads/master
Commit: 7c2a32c2863e83d075dde05cd35e93f3e08306c0
Parents: 2558ae5
Author: zentol <ch...@apache.org>
Authored: Mon Jan 8 14:49:37 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Jan 8 17:29:51 2018 +0100

----------------------------------------------------------------------
 flink-mesos/pom.xml | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7c2a32c2/flink-mesos/pom.xml
----------------------------------------------------------------------
diff --git a/flink-mesos/pom.xml b/flink-mesos/pom.xml
index 36fc476..49df96a 100644
--- a/flink-mesos/pom.xml
+++ b/flink-mesos/pom.xml
@@ -262,6 +262,7 @@ under the License.
 							<goal>shade</goal>
 						</goals>
 						<configuration>
+							<shadeTestJar>false</shadeTestJar>
 							<artifactSet>
 								<includes combine.children="append">
 									<include>com.google.protobuf:protobuf-java</include>


[6/9] flink git commit: [hotfix][docs] Fix DataStream iterations documentation

Posted by ch...@apache.org.
[hotfix][docs] Fix DataStream iterations documentation

* Fix a scala example which is using a wrong variable
* Remove partitioning descriptions
  * partitioning parameters are already removed from
  IterativeStream#closeWith/DateStream#iterate
  * https://github.com/apache/flink/pull/988
  * https://github.com/apache/flink/pull/4655

This closes #5249.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b96d100
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b96d100
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b96d100

Branch: refs/heads/master
Commit: 7b96d10094d0a6f87a3e912e28ce8b68b310d28f
Parents: 4ac348d
Author: okumin <ma...@okumin.com>
Authored: Sat Jan 6 22:48:02 2018 +0900
Committer: zentol <ch...@apache.org>
Committed: Mon Jan 8 18:47:08 2018 +0100

----------------------------------------------------------------------
 docs/dev/datastream_api.md | 9 +--------
 1 file changed, 1 insertion(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7b96d100/docs/dev/datastream_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md
index 6bb755e..09580b1 100644
--- a/docs/dev/datastream_api.md
+++ b/docs/dev/datastream_api.md
@@ -375,9 +375,6 @@ iteration.closeWith(iterationBody.filter(/* one part of the stream */));
 DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);
 {% endhighlight %}
 
-By default the partitioning of the feedback stream will be automatically set to be the same as the input of the
-iteration head. To override this the user can set an optional boolean flag in the `closeWith` method.
-
 For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:
 
 {% highlight java %}
@@ -424,14 +421,10 @@ are forwarded downstream using filters.
 val iteratedStream = someDataStream.iterate(
   iteration => {
     val iterationBody = iteration.map(/* this is executed many times */)
-    (tail.filter(/* one part of the stream */), tail.filter(/* some other part of the stream */))
+    (iterationBody.filter(/* one part of the stream */), iterationBody.filter(/* some other part of the stream */))
 })
 {% endhighlight %}
 
-
-By default the partitioning of the feedback stream will be automatically set to be the same as the input of the
-iteration head. To override this the user can set an optional boolean flag in the `closeWith` method.
-
 For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:
 
 {% highlight scala %}


[4/9] flink git commit: [FLINK-8200][tests] Use TemporaryFolder in RocksDBAsyncSnapshotTest

Posted by ch...@apache.org.
[FLINK-8200][tests] Use TemporaryFolder in RocksDBAsyncSnapshotTest

This closes #5122.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ac348d1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ac348d1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ac348d1

Branch: refs/heads/master
Commit: 4ac348d1fa539d64ce9317035716243c74d70818
Parents: 10aae99
Author: wenlong.lwl <we...@alibaba-inc.com>
Authored: Tue Dec 5 15:16:32 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Mon Jan 8 18:47:07 2018 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBAsyncSnapshotTest.java   | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4ac348d1/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 2ba0494..71c5e77 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -60,7 +59,9 @@ import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -70,7 +71,6 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RunnableFuture;
@@ -96,6 +96,12 @@ import static org.mockito.Mockito.verify;
 public class RocksDBAsyncSnapshotTest extends TestLogger {
 
 	/**
+	 * Temporary fold for test.
+	 */
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	/**
 	 * This ensures that asynchronous state handles are actually materialized asynchronously.
 	 *
 	 * <p>We use latches to block at various stages and see if the code still continues through
@@ -119,7 +125,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 
-		File dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state");
+		File dbDir = temporaryFolder.newFolder();
 
 		RocksDBStateBackend backend = new RocksDBStateBackend(new MemoryStateBackend());
 		backend.setDbStoragePath(dbDir.getAbsolutePath());
@@ -225,7 +231,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 
-		File dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state");
+		File dbDir = temporaryFolder.newFolder();
 
 		BlockingStreamMemoryStateBackend memoryStateBackend = new BlockingStreamMemoryStateBackend();
 
@@ -333,7 +339,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
 
 		RocksDBStateBackend backend = new RocksDBStateBackend(stateBackend);
 
-		backend.setDbStoragePath("file:///tmp/foobar");
+		backend.setDbStoragePath(temporaryFolder.newFolder().toURI().toString());
 
 		AbstractKeyedStateBackend<Void> keyedStateBackend = backend.createKeyedStateBackend(
 			env,


[8/9] flink git commit: [FLINK-8320][docs] Clarify that only Java 8 is supported

Posted by ch...@apache.org.
[FLINK-8320][docs] Clarify that only Java 8 is supported


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ef1f321
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ef1f321
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ef1f321

Branch: refs/heads/master
Commit: 1ef1f321192e11aea62ba0089a42799d2eb127a0
Parents: a31e8a3
Author: zentol <ch...@apache.org>
Authored: Mon Jan 8 17:25:43 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Jan 8 18:47:09 2018 +0100

----------------------------------------------------------------------
 docs/quickstart/java_api_quickstart.md  | 2 +-
 docs/quickstart/scala_api_quickstart.md | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1ef1f321/docs/quickstart/java_api_quickstart.md
----------------------------------------------------------------------
diff --git a/docs/quickstart/java_api_quickstart.md b/docs/quickstart/java_api_quickstart.md
index 7665a75..baf14de 100644
--- a/docs/quickstart/java_api_quickstart.md
+++ b/docs/quickstart/java_api_quickstart.md
@@ -31,7 +31,7 @@ Start working on your Flink Java program in a few simple steps.
 
 ## Requirements
 
-The only requirements are working __Maven 3.0.4__ (or higher) and __Java 8.x__ (or higher) installations.
+The only requirements are working __Maven 3.0.4__ (or higher) and __Java 8.x__ installations.
 
 ## Create Project
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1ef1f321/docs/quickstart/scala_api_quickstart.md
----------------------------------------------------------------------
diff --git a/docs/quickstart/scala_api_quickstart.md b/docs/quickstart/scala_api_quickstart.md
index dce71dd..40c02a9 100644
--- a/docs/quickstart/scala_api_quickstart.md
+++ b/docs/quickstart/scala_api_quickstart.md
@@ -114,7 +114,7 @@ Now you can import the project into Eclipse via `File -> Import... -> Existing P
 
 ### Requirements
 
-The only requirements are working __Maven 3.0.4__ (or higher) and __Java 8.x__ (or higher) installations.
+The only requirements are working __Maven 3.0.4__ (or higher) and __Java 8.x__ installations.
 
 
 ### Create Project


[5/9] flink git commit: [FLINK-8250][runtime] Remove unused RecordSerializer#instantiateMetrics

Posted by ch...@apache.org.
[FLINK-8250][runtime] Remove unused RecordSerializer#instantiateMetrics

This closes #5162.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10aae991
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10aae991
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10aae991

Branch: refs/heads/master
Commit: 10aae991833447506e8e8231c32fb9f816920597
Parents: 5030125
Author: zentol <ch...@apache.org>
Authored: Wed Dec 13 13:23:15 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Jan 8 18:47:07 2018 +0100

----------------------------------------------------------------------
 .../io/network/api/serialization/RecordSerializer.java       | 8 --------
 .../network/api/serialization/SpanningRecordSerializer.java  | 5 -----
 2 files changed, 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/10aae991/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index 6a07f31..f09bd4a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -22,7 +22,6 @@ package org.apache.flink.runtime.io.network.api.serialization;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 
 import java.io.IOException;
 
@@ -125,11 +124,4 @@ public interface RecordSerializer<T extends IOReadableWritable> {
 	 * @return <tt>true</tt> if some data is present
 	 */
 	boolean hasData();
-
-	/**
-	 * Instantiates all metrics.
-	 *
-	 * @param metrics metric group
-	 */
-	void instantiateMetrics(TaskIOMetricGroup metrics);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/10aae991/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index efdfaa1..5e6f8e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -23,7 +23,6 @@ import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 
 import javax.annotation.Nullable;
 
@@ -178,8 +177,4 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 		// either data in current target buffer or intermediate buffers
 		return (targetBuffer != null && !targetBuffer.isEmpty()) || lengthBuffer.hasRemaining() || dataBuffer.hasRemaining();
 	}
-
-	@Override
-	public void instantiateMetrics(TaskIOMetricGroup metrics) {
-	}
 }


[2/9] flink git commit: [FLINK-8292] Remove unnecessary force cast in DataStreamSource

Posted by ch...@apache.org.
[FLINK-8292] Remove unnecessary force cast in DataStreamSource

This closes #5180.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3cdc5d1d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3cdc5d1d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3cdc5d1d

Branch: refs/heads/master
Commit: 3cdc5d1dce7c8cf0db0352d86867f73f6ccd9be9
Parents: 7c2a32c
Author: Matrix42 <93...@qq.com>
Authored: Tue Dec 19 16:20:39 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Mon Jan 8 17:29:51 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/streaming/api/datastream/DataStreamSource.java   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3cdc5d1d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index f5771a8..d08514f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -54,7 +54,8 @@ public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {
 		if (parallelism != 1 && !isParallel) {
 			throw new IllegalArgumentException("Source: " + transformation.getId() + " is not a parallel source");
 		} else {
-			return (DataStreamSource<T>) super.setParallelism(parallelism);
+			super.setParallelism(parallelism);
+			return this;
 		}
 	}
 }


[9/9] flink git commit: [FLINK-8388][docs] Fix baseUrl for master branch

Posted by ch...@apache.org.
[FLINK-8388][docs] Fix baseUrl for master branch

This closes #5263.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f16335d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f16335d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f16335d4

Branch: refs/heads/master
Commit: f16335d427592007a5b242a1b4b0cfcd5bd36858
Parents: 1ef1f32
Author: zentol <ch...@apache.org>
Authored: Mon Jan 8 17:11:01 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Jan 8 18:47:10 2018 +0100

----------------------------------------------------------------------
 docs/_config.yml | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f16335d4/docs/_config.yml
----------------------------------------------------------------------
diff --git a/docs/_config.yml b/docs/_config.yml
index 1cae2d4..1ca8726 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -44,7 +44,7 @@ github_url: "https://github.com/apache/flink"
 download_url: "http://flink.apache.org/downloads.html"
 
 # please use a protocol relative URL here
-baseurl: //ci.apache.org/projects/flink/flink-docs-release-1.4
+baseurl: //ci.apache.org/projects/flink/flink-docs-master
 
 # Flag whether this is a stable version or not. Used for the quickstart page.
 is_stable: false
@@ -53,6 +53,7 @@ is_stable: false
 show_outdated_warning: false
 
 previous_docs:
+  1.4: http://ci.apache.org/projects/flink/flink-docs-release-1.4
   1.3: http://ci.apache.org/projects/flink/flink-docs-release-1.3
   1.2: http://ci.apache.org/projects/flink/flink-docs-release-1.2
   1.1: http://ci.apache.org/projects/flink/flink-docs-release-1.1


[7/9] flink git commit: [hotfix][docs][metrics] Fix Threads.Count metric reference

Posted by ch...@apache.org.
[hotfix][docs][metrics] Fix Threads.Count metric reference

This closes #5213.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a31e8a30
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a31e8a30
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a31e8a30

Branch: refs/heads/master
Commit: a31e8a305cbff9102dcafda7849a379af081e742
Parents: 7b96d10
Author: yew1eb <ye...@gmail.com>
Authored: Fri Dec 29 01:15:40 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Mon Jan 8 18:47:09 2018 +0100

----------------------------------------------------------------------
 docs/monitoring/metrics.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a31e8a30/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index eac73f7..a820821 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -903,8 +903,8 @@ Thus, in order to infer the metric identifier:
   <tbody>
     <tr>
       <th rowspan="1"><strong>Job-/TaskManager</strong></th>
-      <td rowspan="1">Status.JVM.ClassLoader</td>
-      <td>Threads.Count</td>
+      <td rowspan="1">Status.JVM.Threads</td>
+      <td>Count</td>
       <td>The total number of live threads.</td>
       <td>Gauge</td>
     </tr>