You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/31 17:02:32 UTC

[1/2] flink git commit: [FLINK-4539] [runtime] Reuse functionality for Physical Memory size in 'Hardware' and 'EnvironmentInformation'.

Repository: flink
Updated Branches:
  refs/heads/master db919a585 -> 7cd9bb5f1


[FLINK-4539] [runtime] Reuse functionality for Physical Memory size in 'Hardware' and 'EnvironmentInformation'.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97a83a1f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97a83a1f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97a83a1f

Branch: refs/heads/master
Commit: 97a83a1fa62c06db19b09104d85c5e2bb850df0f
Parents: db919a5
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Aug 30 16:31:42 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 31 18:18:02 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/instance/Hardware.java | 245 -----------------
 .../runtime/instance/HardwareDescription.java   |   2 +
 .../runtime/util/EnvironmentInformation.java    |  35 ++-
 .../org/apache/flink/runtime/util/Hardware.java | 272 +++++++++++++++++++
 .../flink/runtime/instance/HardwareTest.java    |  49 ----
 .../apache/flink/runtime/util/HardwareTest.java |  49 ++++
 6 files changed, 340 insertions(+), 312 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/97a83a1f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Hardware.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Hardware.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Hardware.java
deleted file mode 100644
index c393d68..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Hardware.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.flink.util.OperatingSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Convenience class to extract hardware specifics of the computer executing this class
- */
-public class Hardware {
-
-	private static final Logger LOG = LoggerFactory.getLogger(Hardware.class);
-	
-	private static final String LINUX_MEMORY_INFO_PATH = "/proc/meminfo";
-
-	private static final Pattern LINUX_MEMORY_REGEX = Pattern.compile("^MemTotal:\\s*(\\d+)\\s+kB$");
-	
-
-	
-	/**
-	 * Gets the number of CPU cores (hardware contexts) that the JVM has access to.
-	 * 
-	 * @return The number of CPU cores.
-	 */
-	public static int getNumberCPUCores() {
-		return Runtime.getRuntime().availableProcessors();
-	}
-	
-	/**
-	 * Returns the size of the physical memory in bytes.
-	 * 
-	 * @return the size of the physical memory in bytes or <code>-1</code> if
-	 *         the size could not be determined
-	 */
-	public static long getSizeOfPhysicalMemory() {
-		switch (OperatingSystem.getCurrentOperatingSystem()) {
-			case LINUX:
-				return getSizeOfPhysicalMemoryForLinux();
-				
-			case WINDOWS:
-				return getSizeOfPhysicalMemoryForWindows();
-				
-			case MAC_OS:
-				return getSizeOfPhysicalMemoryForMac();
-				
-			case FREE_BSD:
-				return getSizeOfPhysicalMemoryForFreeBSD();
-				
-			case UNKNOWN:
-				LOG.error("Cannot determine size of physical memory for unknown operating system");
-				return -1;
-				
-			default:
-				LOG.error("Unrecognized OS: " + OperatingSystem.getCurrentOperatingSystem());
-				return -1;
-		}
-	}
-
-	/**
-	 * Returns the size of the physical memory in bytes on a Linux-based
-	 * operating system.
-	 * 
-	 * @return the size of the physical memory in bytes or <code>-1</code> if
-	 *         the size could not be determined
-	 */
-	private static long getSizeOfPhysicalMemoryForLinux() {
-		try (BufferedReader lineReader = new BufferedReader(new FileReader(LINUX_MEMORY_INFO_PATH))) {
-			String line;
-			while ((line = lineReader.readLine()) != null) {
-				Matcher matcher = LINUX_MEMORY_REGEX.matcher(line);
-				if (matcher.matches()) {
-					String totalMemory = matcher.group(1);
-					return Long.parseLong(totalMemory) * 1024L; // Convert from kilobyte to byte
-				}
-			}
-			// expected line did not come
-			LOG.error("Cannot determine the size of the physical memory for Linux host (using '/proc/meminfo'). Unexpected format.");
-			return -1;
-		}
-		catch (NumberFormatException e) {
-			LOG.error("Cannot determine the size of the physical memory for Linux host (using '/proc/meminfo'). Unexpected format.");
-			return -1;
-		}
-		catch (Throwable t) {
-			LOG.error("Cannot determine the size of the physical memory for Linux host (using '/proc/meminfo'): " + t.getMessage(), t);
-			return -1;
-		}
-	}
-
-	/**
-	 * Returns the size of the physical memory in bytes on a Mac OS-based
-	 * operating system
-	 * 
-	 * @return the size of the physical memory in bytes or <code>-1</code> if
-	 *         the size could not be determined
-	 */
-	private static long getSizeOfPhysicalMemoryForMac() {
-
-		BufferedReader bi = null;
-
-		try {
-			Process proc = Runtime.getRuntime().exec("sysctl hw.memsize");
-
-			bi = new BufferedReader(
-					new InputStreamReader(proc.getInputStream()));
-
-			String line;
-
-			while ((line = bi.readLine()) != null) {
-				if (line.startsWith("hw.memsize")) {
-					long memsize = Long.parseLong(line.split(":")[1].trim());
-					bi.close();
-					proc.destroy();
-					return memsize;
-				}
-			}
-
-		} catch (Throwable t) {
-			LOG.error("Cannot determine physical memory of machine for MacOS host: " + t.getMessage(), t);
-			return -1;
-		} finally {
-			if (bi != null) {
-				try {
-					bi.close();
-				} catch (IOException ignored) {}
-			}
-		}
-		return -1;
-	}
-
-	/**
-	 * Returns the size of the physical memory in bytes on FreeBSD.
-	 * 
-	 * @return the size of the physical memory in bytes or <code>-1</code> if
-	 *         the size could not be determined
-	 */
-	private static long getSizeOfPhysicalMemoryForFreeBSD() {
-		BufferedReader bi = null;
-		try {
-			Process proc = Runtime.getRuntime().exec("sysctl hw.physmem");
-
-			bi = new BufferedReader(new InputStreamReader(proc.getInputStream()));
-
-			String line;
-
-			while ((line = bi.readLine()) != null) {
-				if (line.startsWith("hw.physmem")) {
-					long memsize = Long.parseLong(line.split(":")[1].trim());
-					bi.close();
-					proc.destroy();
-					return memsize;
-				}
-			}
-			
-			LOG.error("Cannot determine the size of the physical memory for FreeBSD host (using 'sysctl hw.physmem').");
-			return -1;
-		}
-		catch (Throwable t) {
-			LOG.error("Cannot determine the size of the physical memory for FreeBSD host (using 'sysctl hw.physmem'): " + t.getMessage(), t);
-			return -1;
-		}
-		finally {
-			if (bi != null) {
-				try {
-					bi.close();
-				} catch (IOException ignored) {}
-			}
-		}
-	}
-
-	/**
-	 * Returns the size of the physical memory in bytes on Windows.
-	 * 
-	 * @return the size of the physical memory in bytes or <code>-1</code> if
-	 *         the size could not be determined
-	 */
-	private static long getSizeOfPhysicalMemoryForWindows() {
-		BufferedReader bi = null;
-		try {
-			Process proc = Runtime.getRuntime().exec("wmic memorychip get capacity");
-
-			bi = new BufferedReader(new InputStreamReader(proc.getInputStream()));
-
-			String line = bi.readLine();
-			if (line == null) {
-				return -1L;
-			}
-
-			if (!line.startsWith("Capacity")) {
-				return -1L;
-			}
-
-			long sizeOfPhyiscalMemory = 0L;
-			while ((line = bi.readLine()) != null) {
-				if (line.isEmpty()) {
-					continue;
-				}
-
-				line = line.replaceAll(" ", "");
-				sizeOfPhyiscalMemory += Long.parseLong(line);
-			}
-			return sizeOfPhyiscalMemory;
-		}
-		catch (Throwable t) {
-			LOG.error("Cannot determine the size of the physical memory for Windows host (using 'wmic memorychip'): " + t.getMessage(), t);
-			return -1L;
-		}
-		finally {
-			if (bi != null) {
-				try {
-					bi.close();
-				} catch (Throwable ignored) {}
-			}
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private Hardware() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/97a83a1f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
index 92f7414..bfcc1e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.instance;
 
+import org.apache.flink.runtime.util.Hardware;
+
 import java.io.Serializable;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/97a83a1f/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
index a10522d..39fa80e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
@@ -20,15 +20,16 @@ package org.apache.flink.runtime.util;
 
 import java.io.InputStream;
 import java.lang.management.ManagementFactory;
-import java.lang.management.OperatingSystemMXBean;
 import java.lang.management.RuntimeMXBean;
 import java.lang.reflect.Method;
 import java.util.List;
 import java.util.Properties;
 
 import org.apache.hadoop.util.VersionInfo;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
@@ -102,9 +103,7 @@ public class EnvironmentInformation {
 		String user = System.getProperty("user.name");
 		if (user == null) {
 			user = UNKNOWN;
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Cannot determine user/group information for the current user.");
-			}
+			LOG.debug("Cannot determine user/group information for the current user.");
 		}
 		return user;
 	}
@@ -112,27 +111,27 @@ public class EnvironmentInformation {
 	/**
 	 * The maximum JVM heap size, in bytes.
 	 * 
+	 * <p>This method uses the <i>-Xmx</i> value of the JVM, if set. If not set, it returns (as
+	 * a heuristic) 1/4th of the physical memory size.
+	 * 
 	 * @return The maximum JVM heap size, in bytes.
 	 */
 	public static long getMaxJvmHeapMemory() {
-		long maxMemory = Runtime.getRuntime().maxMemory();
-
-		if (maxMemory == Long.MAX_VALUE) {
-			// amount of free memory unknown
-			try {
-				// workaround for Oracle JDK
-				OperatingSystemMXBean operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean();
-				Class<?> clazz = Class.forName("com.sun.management.OperatingSystemMXBean");
-				Method method = clazz.getMethod("getTotalPhysicalMemorySize");
-				maxMemory = (Long) method.invoke(operatingSystemMXBean) / 4;
-			}
-			catch (Throwable e) {
+		final long maxMemory = Runtime.getRuntime().maxMemory();
+		if (maxMemory != Long.MAX_VALUE) {
+			// we have the proper max memory
+			return maxMemory;
+		} else {
+			// max JVM heap size is not set - use the heuristic to use 1/4th of the physical memory
+			final long physicalMemory = Hardware.getSizeOfPhysicalMemory();
+			if (physicalMemory != -1) {
+				// got proper value for physical memory
+				return physicalMemory / 4;
+			} else {
 				throw new RuntimeException("Could not determine the amount of free memory.\n" +
 						"Please set the maximum memory for the JVM, e.g. -Xmx512M for 512 megabytes.");
 			}
 		}
-		
-		return maxMemory;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/97a83a1f/flink-runtime/src/main/java/org/apache/flink/runtime/util/Hardware.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/Hardware.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/Hardware.java
new file mode 100644
index 0000000..0b0e7ff
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/Hardware.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.flink.util.OperatingSystem;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Convenience class to extract hardware specifics of the computer executing the running JVM.
+ */
+public class Hardware {
+
+	private static final Logger LOG = LoggerFactory.getLogger(Hardware.class);
+
+	private static final String LINUX_MEMORY_INFO_PATH = "/proc/meminfo";
+
+	private static final Pattern LINUX_MEMORY_REGEX = Pattern.compile("^MemTotal:\\s*(\\d+)\\s+kB$");
+
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Gets the number of CPU cores (hardware contexts) that the JVM has access to.
+	 * 
+	 * @return The number of CPU cores.
+	 */
+	public static int getNumberCPUCores() {
+		return Runtime.getRuntime().availableProcessors();
+	}
+
+	/**
+	 * Returns the size of the physical memory in bytes.
+	 * 
+	 * @return the size of the physical memory in bytes or {@code -1}, if
+	 *         the size could not be determined.
+	 */
+	public static long getSizeOfPhysicalMemory() {
+		// first try if the JVM can directly tell us what the system memory is
+		// this works only on Oracle JVMs
+		try {
+			Class<?> clazz = Class.forName("com.sun.management.OperatingSystemMXBean");
+			Method method = clazz.getMethod("getTotalPhysicalMemorySize");
+			OperatingSystemMXBean operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean();
+
+			// someone may install different beans, so we need to check whether the bean
+			// is in fact the sun management bean
+			if (clazz.isInstance(operatingSystemMXBean)) {
+				return (Long) method.invoke(operatingSystemMXBean);
+			}
+		}
+		catch (ClassNotFoundException e) {
+			// this happens on non-Oracle JVMs, do nothing and use the alternative code paths
+		}
+		catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+			LOG.warn("Access to physical memory size: " +
+					"com.sun.management.OperatingSystemMXBean incompatibly changed.", e);
+		}
+
+		// we now try the OS specific access paths
+		switch (OperatingSystem.getCurrentOperatingSystem()) {
+			case LINUX:
+				return getSizeOfPhysicalMemoryForLinux();
+
+			case WINDOWS:
+				return getSizeOfPhysicalMemoryForWindows();
+
+			case MAC_OS:
+				return getSizeOfPhysicalMemoryForMac();
+
+			case FREE_BSD:
+				return getSizeOfPhysicalMemoryForFreeBSD();
+
+			case UNKNOWN:
+				LOG.error("Cannot determine size of physical memory for unknown operating system");
+				return -1;
+
+			default:
+				LOG.error("Unrecognized OS: " + OperatingSystem.getCurrentOperatingSystem());
+				return -1;
+		}
+	}
+
+	/**
+	 * Returns the size of the physical memory in bytes on a Linux-based
+	 * operating system.
+	 * 
+	 * @return the size of the physical memory in bytes or {@code -1}, if
+	 *         the size could not be determined
+	 */
+	private static long getSizeOfPhysicalMemoryForLinux() {
+		try (BufferedReader lineReader = new BufferedReader(new FileReader(LINUX_MEMORY_INFO_PATH))) {
+			String line;
+			while ((line = lineReader.readLine()) != null) {
+				Matcher matcher = LINUX_MEMORY_REGEX.matcher(line);
+				if (matcher.matches()) {
+					String totalMemory = matcher.group(1);
+					return Long.parseLong(totalMemory) * 1024L; // Convert from kilobyte to byte
+				}
+			}
+			// expected line did not come
+			LOG.error("Cannot determine the size of the physical memory for Linux host (using '/proc/meminfo'). " +
+					"Unexpected format.");
+			return -1;
+		}
+		catch (NumberFormatException e) {
+			LOG.error("Cannot determine the size of the physical memory for Linux host (using '/proc/meminfo'). " +
+					"Unexpected format.");
+			return -1;
+		}
+		catch (Throwable t) {
+			LOG.error("Cannot determine the size of the physical memory for Linux host (using '/proc/meminfo') ", t);
+			return -1;
+		}
+	}
+
+	/**
+	 * Returns the size of the physical memory in bytes on a Mac OS-based
+	 * operating system
+	 * 
+	 * @return the size of the physical memory in bytes or {@code -1}, if
+	 *         the size could not be determined
+	 */
+	private static long getSizeOfPhysicalMemoryForMac() {
+		BufferedReader bi = null;
+		try {
+			Process proc = Runtime.getRuntime().exec("sysctl hw.memsize");
+
+			bi = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+
+			String line;
+			while ((line = bi.readLine()) != null) {
+				if (line.startsWith("hw.memsize")) {
+					long memsize = Long.parseLong(line.split(":")[1].trim());
+					bi.close();
+					proc.destroy();
+					return memsize;
+				}
+			}
+
+		} catch (Throwable t) {
+			LOG.error("Cannot determine physical memory of machine for MacOS host", t);
+			return -1;
+		} finally {
+			if (bi != null) {
+				try {
+					bi.close();
+				} catch (IOException ignored) {}
+			}
+		}
+		return -1;
+	}
+
+	/**
+	 * Returns the size of the physical memory in bytes on FreeBSD.
+	 * 
+	 * @return the size of the physical memory in bytes or {@code -1}, if
+	 *         the size could not be determined
+	 */
+	private static long getSizeOfPhysicalMemoryForFreeBSD() {
+		BufferedReader bi = null;
+		try {
+			Process proc = Runtime.getRuntime().exec("sysctl hw.physmem");
+
+			bi = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+
+			String line;
+			while ((line = bi.readLine()) != null) {
+				if (line.startsWith("hw.physmem")) {
+					long memsize = Long.parseLong(line.split(":")[1].trim());
+					bi.close();
+					proc.destroy();
+					return memsize;
+				}
+			}
+			
+			LOG.error("Cannot determine the size of the physical memory for FreeBSD host " +
+					"(using 'sysctl hw.physmem').");
+			return -1;
+		}
+		catch (Throwable t) {
+			LOG.error("Cannot determine the size of the physical memory for FreeBSD host " +
+					"(using 'sysctl hw.physmem')", t);
+			return -1;
+		}
+		finally {
+			if (bi != null) {
+				try {
+					bi.close();
+				} catch (IOException ignored) {}
+			}
+		}
+	}
+
+	/**
+	 * Returns the size of the physical memory in bytes on Windows.
+	 * 
+	 * @return the size of the physical memory in bytes or {@code -1}, if
+	 *         the size could not be determined
+	 */
+	private static long getSizeOfPhysicalMemoryForWindows() {
+		BufferedReader bi = null;
+		try {
+			Process proc = Runtime.getRuntime().exec("wmic memorychip get capacity");
+
+			bi = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+
+			String line = bi.readLine();
+			if (line == null) {
+				return -1L;
+			}
+
+			if (!line.startsWith("Capacity")) {
+				return -1L;
+			}
+
+			long sizeOfPhyiscalMemory = 0L;
+			while ((line = bi.readLine()) != null) {
+				if (line.isEmpty()) {
+					continue;
+				}
+
+				line = line.replaceAll(" ", "");
+				sizeOfPhyiscalMemory += Long.parseLong(line);
+			}
+			return sizeOfPhyiscalMemory;
+		}
+		catch (Throwable t) {
+			LOG.error("Cannot determine the size of the physical memory for Windows host " +
+					"(using 'wmic memorychip')", t);
+			return -1L;
+		}
+		finally {
+			if (bi != null) {
+				try {
+					bi.close();
+				} catch (Throwable ignored) {}
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private Hardware() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97a83a1f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/HardwareTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/HardwareTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/HardwareTest.java
deleted file mode 100644
index a7ac28b..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/HardwareTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import static org.junit.Assert.*;
-
-import org.junit.Test;
-
-public class HardwareTest {
-
-	@Test
-	public void testCpuCores() {
-		try {
-			assertTrue(Hardware.getNumberCPUCores() >= 0);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testPhysicalMemory() {
-		try {
-			long physMem = Hardware.getSizeOfPhysicalMemory();
-			assertTrue(physMem >= -1);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/97a83a1f/flink-runtime/src/test/java/org/apache/flink/runtime/util/HardwareTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/HardwareTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/HardwareTest.java
new file mode 100644
index 0000000..1995f4d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/HardwareTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class HardwareTest {
+
+	@Test
+	public void testCpuCores() {
+		try {
+			assertTrue(Hardware.getNumberCPUCores() >= 0);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testPhysicalMemory() {
+		try {
+			long physMem = Hardware.getSizeOfPhysicalMemory();
+			assertTrue(physMem >= -1);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}


[2/2] flink git commit: [hotfix] [cassandra connector] Fix minor issues in CassandraConnectorTest.

Posted by se...@apache.org.
[hotfix] [cassandra connector] Fix minor issues in CassandraConnectorTest.

The test now properly uses and reuses a mini cluster, rather than spawning a local environment for each test.
This also properly renames the CassandraConnectorTest to CassandraConnectorITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7cd9bb5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7cd9bb5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7cd9bb5f

Branch: refs/heads/master
Commit: 7cd9bb5f1e09ad2fdbe2b7872f92432dcfbad374
Parents: 97a83a1
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 31 16:54:08 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 31 18:44:29 2016 +0200

----------------------------------------------------------------------
 .../cassandra/CassandraCommitter.java           |  13 +-
 .../connectors/cassandra/CassandraPojoSink.java |  12 +-
 .../cassandra/CassandraConnectorITCase.java     | 460 +++++++++++++++++++
 .../cassandra/CassandraConnectorTest.java       | 426 -----------------
 .../operators/WriteAheadSinkTestBase.java       |   8 +-
 5 files changed, 479 insertions(+), 440 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7cd9bb5f/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
index 5dceb60..e83b1be 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
@@ -26,11 +26,14 @@ import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
 /**
  * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra
  * database.
- * <p/>
- * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
+ * 
+ * <p>Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
  */
 public class CassandraCommitter extends CheckpointCommitter {
-	private ClusterBuilder builder;
+
+	private static final long serialVersionUID = 1L;
+	
+	private final ClusterBuilder builder;
 	private transient Cluster cluster;
 	private transient Session session;
 
@@ -54,9 +57,6 @@ public class CassandraCommitter extends CheckpointCommitter {
 
 	/**
 	 * Internally used to set the job ID after instantiation.
-	 *
-	 * @param id
-	 * @throws Exception
 	 */
 	public void setJobId(String id) throws Exception {
 		super.setJobId(id);
@@ -66,7 +66,6 @@ public class CassandraCommitter extends CheckpointCommitter {
 	/**
 	 * Generates the necessary tables to store information.
 	 *
-	 * @return
 	 * @throws Exception
 	 */
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7cd9bb5f/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
index 204a0f3..650c481 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
@@ -23,13 +23,19 @@ import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.flink.configuration.Configuration;
 
 /**
- * Flink Sink to save data into a Cassandra cluster using {@link Mapper}, which
- * it uses annotations from {@link com.datastax.driver.mapping}.
+ * Flink Sink to save data into a Cassandra cluster using 
+ * <a href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/Mapper.html">Mapper</a>,
+ * which it uses annotations from
+ * <a href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/annotations/package-summary.html">
+ * com.datastax.driver.mapping.annotations</a>.
  *
  * @param <IN> Type of the elements emitted by this sink
  */
 public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> {
-	protected Class<IN> clazz;
+
+	private static final long serialVersionUID = 1L;
+
+	protected final Class<IN> clazz;
 	protected transient Mapper<IN> mapper;
 	protected transient MappingManager mappingManager;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7cd9bb5f/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
new file mode 100644
index 0000000..9388818
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+
+import org.apache.cassandra.service.CassandraDaemon;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
+import org.apache.flink.test.util.TestEnvironment;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.internal.AssumptionViolatedException;
+import org.junit.runner.RunWith;
+
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Scanner;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+@PowerMockIgnore("javax.management.*")
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ResultPartitionWriter.class)
+public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class);
+	private static File tmpDir;
+
+	private static final boolean EMBEDDED = true;
+
+	private static EmbeddedCassandraService cassandra;
+
+	private static ClusterBuilder builder = new ClusterBuilder() {
+		@Override
+		protected Cluster buildCluster(Cluster.Builder builder) {
+			return builder
+				.addContactPoint("127.0.0.1")
+				.withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
+				.withoutJMXReporting()
+				.withoutMetrics().build();
+		}
+	};
+
+	private static Cluster cluster;
+	private static Session session;
+
+	private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
+	private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE flink;";
+	private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink.test (id text PRIMARY KEY, counter int, batch_id int);";
+	private static final String CLEAR_TABLE_QUERY = "TRUNCATE flink.test;";
+	private static final String INSERT_DATA_QUERY = "INSERT INTO flink.test (id, counter, batch_id) VALUES (?, ?, ?)";
+	private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;";
+
+	private static final ArrayList<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(20);
+
+	static {
+		for (int i = 0; i < 20; i++) {
+			collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0));
+		}
+	}
+
+	private static class EmbeddedCassandraService {
+		CassandraDaemon cassandraDaemon;
+
+		public void start() throws IOException {
+			this.cassandraDaemon = new CassandraDaemon();
+			this.cassandraDaemon.init(null);
+			this.cassandraDaemon.start();
+		}
+
+		public void stop() {
+			this.cassandraDaemon.stop();
+		}
+	}
+
+	private static ForkableFlinkMiniCluster flinkCluster;
+
+	// ------------------------------------------------------------------------
+	//  Cluster Setup (Cassandra & Flink)
+	// ------------------------------------------------------------------------
+
+	@BeforeClass
+	public static void startCassandra() throws IOException {
+
+		// check if we should run this test, current Cassandra version requires Java >= 1.8
+		try {
+			String javaVersionString = System.getProperty("java.runtime.version").substring(0, 3);
+			float javaVersion = Float.parseFloat(javaVersionString);
+			Assume.assumeTrue(javaVersion >= 1.8f);
+		}
+		catch (AssumptionViolatedException e) {
+			System.out.println("Skipping CassandraConnectorITCase, because the JDK is < Java 8+");
+			throw e;
+		}
+		catch (Exception e) {
+			LOG.error("Cannot determine Java version", e);
+			e.printStackTrace();
+			fail("Cannot determine Java version");
+		}
+
+		// generate temporary files
+		tmpDir = CommonTestUtils.createTempDirectory();
+		ClassLoader classLoader = CassandraConnectorITCase.class.getClassLoader();
+		File file = new File(classLoader.getResource("cassandra.yaml").getFile());
+		File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml");
+		
+		assertTrue(tmp.createNewFile());
+		BufferedWriter b = new BufferedWriter(new FileWriter(tmp));
+
+		//copy cassandra.yaml; inject absolute paths into cassandra.yaml
+		Scanner scanner = new Scanner(file);
+		while (scanner.hasNextLine()) {
+			String line = scanner.nextLine();
+			line = line.replace("$PATH", "'" + tmp.getParentFile());
+			b.write(line + "\n");
+			b.flush();
+		}
+		scanner.close();
+
+
+		// Tell cassandra where the configuration files are.
+		// Use the test configuration file.
+		System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString());
+
+		if (EMBEDDED) {
+			cassandra = new EmbeddedCassandraService();
+			cassandra.start();
+		}
+
+		try {
+			Thread.sleep(1000 * 10);
+		} catch (InterruptedException e) { //give cassandra a few seconds to start up
+		}
+
+		cluster = builder.getCluster();
+		session = cluster.connect();
+
+		session.execute(CREATE_KEYSPACE_QUERY);
+		session.execute(CREATE_TABLE_QUERY);
+	}
+
+	@BeforeClass
+	public static void startFlink() throws Exception {
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
+
+		flinkCluster = new ForkableFlinkMiniCluster(config);
+		flinkCluster.start();
+	}
+
+	@AfterClass
+	public static void stopFlink() {
+		flinkCluster.stop();
+	}
+
+	@AfterClass
+	public static void closeCassandra() {
+		if (session != null) {
+			session.executeAsync(DROP_KEYSPACE_QUERY);
+			session.close();
+		}
+
+		if (cluster != null) {
+			cluster.close();
+		}
+
+		if (cassandra != null) {
+			cassandra.stop();
+		}
+
+		if (tmpDir != null) {
+			//noinspection ResultOfMethodCallIgnored
+			tmpDir.delete();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Test preparation & cleanup
+	// ------------------------------------------------------------------------
+
+	@Before
+	public void initializeExecutionEnvironment() {
+		TestStreamEnvironment.setAsContext(flinkCluster, 4);
+		new TestEnvironment(flinkCluster, 4, false).setAsContext();
+	}
+
+	@After
+	public void deleteSchema() throws Exception {
+		session.executeAsync(CLEAR_TABLE_QUERY);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Exactly-once Tests
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink() throws Exception {
+		return new CassandraTupleWriteAheadSink<>(
+			INSERT_DATA_QUERY,
+			TypeExtractor.getForObject(new Tuple3<>("", 0, 0)).createSerializer(new ExecutionConfig()),
+			builder,
+			new CassandraCommitter(builder));
+	}
+
+	@Override
+	protected TupleTypeInfo<Tuple3<String, Integer, Integer>> createTypeInfo() {
+		return TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Integer.class);
+	}
+
+	@Override
+	protected Tuple3<String, Integer, Integer> generateValue(int counter, int checkpointID) {
+		return new Tuple3<>(UUID.randomUUID().toString(), counter, checkpointID);
+	}
+
+	@Override
+	protected void verifyResultsIdealCircumstances(
+		OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
+		OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
+		CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
+
+		ResultSet result = session.execute(SELECT_DATA_QUERY);
+		ArrayList<Integer> list = new ArrayList<>();
+		for (int x = 1; x <= 60; x++) {
+			list.add(x);
+		}
+
+		for (Row s : result) {
+			list.remove(new Integer(s.getInt("counter")));
+		}
+		Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
+	}
+
+	@Override
+	protected void verifyResultsDataPersistenceUponMissedNotify(
+		OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
+		OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
+		CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
+
+		ResultSet result = session.execute(SELECT_DATA_QUERY);
+		ArrayList<Integer> list = new ArrayList<>();
+		for (int x = 1; x <= 60; x++) {
+			list.add(x);
+		}
+
+		for (Row s : result) {
+			list.remove(new Integer(s.getInt("counter")));
+		}
+		Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
+	}
+
+	@Override
+	protected void verifyResultsDataDiscardingUponRestore(
+		OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
+		OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
+		CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
+
+		ResultSet result = session.execute(SELECT_DATA_QUERY);
+		ArrayList<Integer> list = new ArrayList<>();
+		for (int x = 1; x <= 20; x++) {
+			list.add(x);
+		}
+		for (int x = 41; x <= 60; x++) {
+			list.add(x);
+		}
+
+		for (Row s : result) {
+			list.remove(new Integer(s.getInt("counter")));
+		}
+		Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
+	}
+
+	@Test
+	public void testCassandraCommitter() throws Exception {
+		CassandraCommitter cc1 = new CassandraCommitter(builder);
+		cc1.setJobId("job");
+		cc1.setOperatorId("operator");
+		cc1.setOperatorSubtaskId(0);
+
+		CassandraCommitter cc2 = new CassandraCommitter(builder);
+		cc2.setJobId("job");
+		cc2.setOperatorId("operator");
+		cc2.setOperatorSubtaskId(1);
+
+		CassandraCommitter cc3 = new CassandraCommitter(builder);
+		cc3.setJobId("job");
+		cc3.setOperatorId("operator1");
+		cc3.setOperatorSubtaskId(0);
+
+		cc1.createResource();
+
+		cc1.open();
+		cc2.open();
+		cc3.open();
+
+		Assert.assertFalse(cc1.isCheckpointCommitted(1));
+		Assert.assertFalse(cc2.isCheckpointCommitted(1));
+		Assert.assertFalse(cc3.isCheckpointCommitted(1));
+
+		cc1.commitCheckpoint(1);
+		Assert.assertTrue(cc1.isCheckpointCommitted(1));
+		//verify that other sub-tasks aren't affected
+		Assert.assertFalse(cc2.isCheckpointCommitted(1));
+		//verify that other tasks aren't affected
+		Assert.assertFalse(cc3.isCheckpointCommitted(1));
+
+		Assert.assertFalse(cc1.isCheckpointCommitted(2));
+
+		cc1.close();
+		cc2.close();
+		cc3.close();
+
+		cc1 = new CassandraCommitter(builder);
+		cc1.setJobId("job");
+		cc1.setOperatorId("operator");
+		cc1.setOperatorSubtaskId(0);
+
+		cc1.open();
+
+		//verify that checkpoint data is not destroyed within open/close and not reliant on internally cached data
+		Assert.assertTrue(cc1.isCheckpointCommitted(1));
+		Assert.assertFalse(cc1.isCheckpointCommitted(2));
+
+		cc1.close();
+	}
+
+	// ------------------------------------------------------------------------
+	//  At-least-once Tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testCassandraTupleAtLeastOnceSink() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+
+		DataStream<Tuple3<String, Integer, Integer>> source = env.fromCollection(collection);
+		source.addSink(new CassandraTupleSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder));
+
+		env.execute();
+
+		ResultSet rs = session.execute(SELECT_DATA_QUERY);
+		Assert.assertEquals(20, rs.all().size());
+	}
+
+	@Test
+	public void testCassandraPojoAtLeastOnceSink() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+
+		DataStreamSource<Pojo> source = env
+			.addSource(new SourceFunction<Pojo>() {
+
+				private boolean running = true;
+				private volatile int cnt = 0;
+
+				@Override
+				public void run(SourceContext<Pojo> ctx) throws Exception {
+					while (running) {
+						ctx.collect(new Pojo(UUID.randomUUID().toString(), cnt, 0));
+						cnt++;
+						if (cnt == 20) {
+							cancel();
+						}
+					}
+				}
+
+				@Override
+				public void cancel() {
+					running = false;
+				}
+			});
+
+		source.addSink(new CassandraPojoSink<>(Pojo.class, builder));
+
+		env.execute();
+
+		ResultSet rs = session.execute(SELECT_DATA_QUERY);
+		Assert.assertEquals(20, rs.all().size());
+	}
+
+	@Test
+	public void testCassandraBatchFormats() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+
+		DataSet<Tuple3<String, Integer, Integer>> dataSet = env.fromCollection(collection);
+		dataSet.output(new CassandraOutputFormat<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder));
+
+		env.execute("Write data");
+
+		DataSet<Tuple3<String, Integer, Integer>> inputDS = env.createInput(
+			new CassandraInputFormat<Tuple3<String, Integer, Integer>>(SELECT_DATA_QUERY, builder),
+			TypeInformation.of(new TypeHint<Tuple3<String, Integer, Integer>>(){}));
+
+
+		long count = inputDS.count();
+		Assert.assertEquals(count, 20L);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7cd9bb5f/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
deleted file mode 100644
index 2018255..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
+++ /dev/null
@@ -1,426 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.QueryOptions;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-
-import org.apache.cassandra.service.CassandraDaemon;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
-import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
-
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.internal.AssumptionViolatedException;
-import org.junit.runner.RunWith;
-
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Scanner;
-import java.util.UUID;
-
-import static org.junit.Assert.*;
-
-@SuppressWarnings("serial")
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ResultPartitionWriter.class)
-public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorTest.class);
-	private static File tmpDir;
-
-	private static final boolean EMBEDDED = true;
-
-	private static EmbeddedCassandraService cassandra;
-
-	private static ClusterBuilder builder = new ClusterBuilder() {
-		@Override
-		protected Cluster buildCluster(Cluster.Builder builder) {
-			return builder
-				.addContactPoint("127.0.0.1")
-				.withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
-				.withoutJMXReporting()
-				.withoutMetrics().build();
-		}
-	};
-
-	private static Cluster cluster;
-	private static Session session;
-
-	private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
-	private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE flink;";
-	private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink.test (id text PRIMARY KEY, counter int, batch_id int);";
-	private static final String CLEAR_TABLE_QUERY = "TRUNCATE flink.test;";
-	private static final String INSERT_DATA_QUERY = "INSERT INTO flink.test (id, counter, batch_id) VALUES (?, ?, ?)";
-	private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;";
-
-	private static final ArrayList<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(20);
-
-	static {
-		for (int i = 0; i < 20; i++) {
-			collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0));
-		}
-	}
-
-	private static class EmbeddedCassandraService {
-		CassandraDaemon cassandraDaemon;
-
-		public void start() throws IOException {
-			this.cassandraDaemon = new CassandraDaemon();
-			this.cassandraDaemon.init(null);
-			this.cassandraDaemon.start();
-		}
-
-		public void stop() {
-			this.cassandraDaemon.stop();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Cassandra Cluster Setup
-	// ------------------------------------------------------------------------
-
-	@BeforeClass
-	public static void startCassandra() throws IOException {
-
-		// check if we should run this test, current Cassandra version requires Java >= 1.8
-		try {
-			String javaVersionString = System.getProperty("java.runtime.version").substring(0, 3);
-			float javaVersion = Float.parseFloat(javaVersionString);
-			Assume.assumeTrue(javaVersion >= 1.8f);
-		}
-		catch (AssumptionViolatedException e) {
-			System.out.println("Skipping CassandraConnectorTest, because the JDK is < Java 8+");
-			throw e;
-		}
-		catch (Exception e) {
-			LOG.error("Cannot determine Java version", e);
-			e.printStackTrace();
-			fail("Cannot determine Java version");
-		}
-
-		// generate temporary files
-		tmpDir = CommonTestUtils.createTempDirectory();
-		ClassLoader classLoader = CassandraConnectorTest.class.getClassLoader();
-		File file = new File(classLoader.getResource("cassandra.yaml").getFile());
-		File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml");
-		
-		assertTrue(tmp.createNewFile());
-		BufferedWriter b = new BufferedWriter(new FileWriter(tmp));
-
-		//copy cassandra.yaml; inject absolute paths into cassandra.yaml
-		Scanner scanner = new Scanner(file);
-		while (scanner.hasNextLine()) {
-			String line = scanner.nextLine();
-			line = line.replace("$PATH", "'" + tmp.getParentFile());
-			b.write(line + "\n");
-			b.flush();
-		}
-		scanner.close();
-
-
-		// Tell cassandra where the configuration files are.
-		// Use the test configuration file.
-		System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString());
-
-		if (EMBEDDED) {
-			cassandra = new EmbeddedCassandraService();
-			cassandra.start();
-		}
-
-		try {
-			Thread.sleep(1000 * 10);
-		} catch (InterruptedException e) { //give cassandra a few seconds to start up
-		}
-
-		cluster = builder.getCluster();
-		session = cluster.connect();
-
-		session.execute(CREATE_KEYSPACE_QUERY);
-		session.execute(CREATE_TABLE_QUERY);
-	}
-
-	@Before
-	public void checkIfIgnore() {
-		
-	}
-
-	@After
-	public void deleteSchema() throws Exception {
-		session.executeAsync(CLEAR_TABLE_QUERY);
-	}
-
-	@AfterClass
-	public static void closeCassandra() {
-		if (session != null) {
-			session.executeAsync(DROP_KEYSPACE_QUERY);
-			session.close();
-		}
-
-		if (cluster != null) {
-			cluster.close();
-		}
-
-		if (cassandra != null) {
-			cassandra.stop();
-		}
-
-		if (tmpDir != null) {
-			//noinspection ResultOfMethodCallIgnored
-			tmpDir.delete();
-		}
-	}
-
-	//=====Exactly-Once=================================================================================================
-	@Override
-	protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink() throws Exception {
-		return new CassandraTupleWriteAheadSink<>(
-			INSERT_DATA_QUERY,
-			TypeExtractor.getForObject(new Tuple3<>("", 0, 0)).createSerializer(new ExecutionConfig()),
-			builder,
-			new CassandraCommitter(builder));
-	}
-
-	@Override
-	protected TupleTypeInfo<Tuple3<String, Integer, Integer>> createTypeInfo() {
-		return TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Integer.class);
-	}
-
-	@Override
-	protected Tuple3<String, Integer, Integer> generateValue(int counter, int checkpointID) {
-		return new Tuple3<>(UUID.randomUUID().toString(), counter, checkpointID);
-	}
-
-	@Override
-	protected void verifyResultsIdealCircumstances(
-		OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
-		OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
-		CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
-
-		ResultSet result = session.execute(SELECT_DATA_QUERY);
-		ArrayList<Integer> list = new ArrayList<>();
-		for (int x = 1; x <= 60; x++) {
-			list.add(x);
-		}
-
-		for (Row s : result) {
-			list.remove(new Integer(s.getInt("counter")));
-		}
-		Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
-	}
-
-	@Override
-	protected void verifyResultsDataPersistenceUponMissedNotify(
-		OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
-		OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
-		CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
-
-		ResultSet result = session.execute(SELECT_DATA_QUERY);
-		ArrayList<Integer> list = new ArrayList<>();
-		for (int x = 1; x <= 60; x++) {
-			list.add(x);
-		}
-
-		for (Row s : result) {
-			list.remove(new Integer(s.getInt("counter")));
-		}
-		Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
-	}
-
-	@Override
-	protected void verifyResultsDataDiscardingUponRestore(
-		OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
-		OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
-		CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
-
-		ResultSet result = session.execute(SELECT_DATA_QUERY);
-		ArrayList<Integer> list = new ArrayList<>();
-		for (int x = 1; x <= 20; x++) {
-			list.add(x);
-		}
-		for (int x = 41; x <= 60; x++) {
-			list.add(x);
-		}
-
-		for (Row s : result) {
-			list.remove(new Integer(s.getInt("counter")));
-		}
-		Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
-	}
-
-	@Test
-	public void testCassandraCommitter() throws Exception {
-		CassandraCommitter cc1 = new CassandraCommitter(builder);
-		cc1.setJobId("job");
-		cc1.setOperatorId("operator");
-		cc1.setOperatorSubtaskId(0);
-
-		CassandraCommitter cc2 = new CassandraCommitter(builder);
-		cc2.setJobId("job");
-		cc2.setOperatorId("operator");
-		cc2.setOperatorSubtaskId(1);
-
-		CassandraCommitter cc3 = new CassandraCommitter(builder);
-		cc3.setJobId("job");
-		cc3.setOperatorId("operator1");
-		cc3.setOperatorSubtaskId(0);
-
-		cc1.createResource();
-
-		cc1.open();
-		cc2.open();
-		cc3.open();
-
-		Assert.assertFalse(cc1.isCheckpointCommitted(1));
-		Assert.assertFalse(cc2.isCheckpointCommitted(1));
-		Assert.assertFalse(cc3.isCheckpointCommitted(1));
-
-		cc1.commitCheckpoint(1);
-		Assert.assertTrue(cc1.isCheckpointCommitted(1));
-		//verify that other sub-tasks aren't affected
-		Assert.assertFalse(cc2.isCheckpointCommitted(1));
-		//verify that other tasks aren't affected
-		Assert.assertFalse(cc3.isCheckpointCommitted(1));
-
-		Assert.assertFalse(cc1.isCheckpointCommitted(2));
-
-		cc1.close();
-		cc2.close();
-		cc3.close();
-
-		cc1 = new CassandraCommitter(builder);
-		cc1.setJobId("job");
-		cc1.setOperatorId("operator");
-		cc1.setOperatorSubtaskId(0);
-
-		cc1.open();
-
-		//verify that checkpoint data is not destroyed within open/close and not reliant on internally cached data
-		Assert.assertTrue(cc1.isCheckpointCommitted(1));
-		Assert.assertFalse(cc1.isCheckpointCommitted(2));
-
-		cc1.close();
-	}
-
-	//=====At-Least-Once================================================================================================
-	@Test
-	public void testCassandraTupleAtLeastOnceSink() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
-
-		DataStream<Tuple3<String, Integer, Integer>> source = env.fromCollection(collection);
-		source.addSink(new CassandraTupleSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder));
-
-		env.execute();
-
-		ResultSet rs = session.execute(SELECT_DATA_QUERY);
-		Assert.assertEquals(20, rs.all().size());
-	}
-
-	@Test
-	public void testCassandraPojoAtLeastOnceSink() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
-
-		DataStreamSource<Pojo> source = env
-			.addSource(new SourceFunction<Pojo>() {
-
-				private boolean running = true;
-				private volatile int cnt = 0;
-
-				@Override
-				public void run(SourceContext<Pojo> ctx) throws Exception {
-					while (running) {
-						ctx.collect(new Pojo(UUID.randomUUID().toString(), cnt, 0));
-						cnt++;
-						if (cnt == 20) {
-							cancel();
-						}
-					}
-				}
-
-				@Override
-				public void cancel() {
-					running = false;
-				}
-			});
-
-		source.addSink(new CassandraPojoSink<>(Pojo.class, builder));
-
-		env.execute();
-
-		ResultSet rs = session.execute(SELECT_DATA_QUERY);
-		Assert.assertEquals(20, rs.all().size());
-	}
-
-	@Test
-	public void testCassandraBatchFormats() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
-
-		DataSet<Tuple3<String, Integer, Integer>> dataSet = env.fromCollection(collection);
-		dataSet.output(new CassandraOutputFormat<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder));
-
-		env.execute("Write data");
-
-		DataSet<Tuple3<String, Integer, Integer>> inputDS = env.createInput(
-			new CassandraInputFormat<Tuple3<String, Integer, Integer>>(SELECT_DATA_QUERY, builder),
-			TypeInformation.of(new TypeHint<Tuple3<String, Integer, Integer>>(){}));
-
-
-		long count = inputDS.count();
-		Assert.assertEquals(count, 20L);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7cd9bb5f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
index e3df9fc..221d7da 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
@@ -25,9 +25,10 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -40,10 +41,9 @@ import java.util.ArrayList;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(ResultPartitionWriter.class)
-@PowerMockIgnore("javax.management.*")
-public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink<IN>> {
+public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink<IN>> extends TestLogger {
 
-	protected class OperatorExposingTask<INT> extends OneInputStreamTask<INT, INT> {
+	protected static class OperatorExposingTask<INT> extends OneInputStreamTask<INT, INT> {
 		public OneInputStreamOperator<INT, INT> getOperator() {
 			return this.headOperator;
 		}