You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/06/27 11:17:07 UTC

flink git commit: [FLINK-4087] [metrics] Improved JMX port handling

Repository: flink
Updated Branches:
  refs/heads/master 19ff8db68 -> 62cb954d9


[FLINK-4087] [metrics] Improved JMX port handling

This closes #2145


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62cb954d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62cb954d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62cb954d

Branch: refs/heads/master
Commit: 62cb954d989e3ed36f6d9ab7daeff93cda64adbb
Parents: 19ff8db
Author: zentol <ch...@apache.org>
Authored: Fri Jun 17 15:47:53 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jun 27 13:14:27 2016 +0200

----------------------------------------------------------------------
 LICENSE                                         |  15 ++
 .../state/RocksDBAsyncKVSnapshotTest.java       |   2 +
 .../flink/storm/wrappers/BoltWrapperTest.java   |   2 +
 .../flink/storm/wrappers/SpoutWrapperTest.java  |   2 +
 .../storm/wrappers/WrapperSetupHelperTest.java  |   2 +-
 .../apache/flink/metrics/MetricRegistry.java    |  23 ++-
 .../flink/metrics/reporter/JMXReporter.java     | 123 +++++++++++++++-
 .../flink/metrics/groups/JobGroupTest.java      |   3 +
 .../groups/MetricGroupRegistrationTest.java     |  16 ++-
 .../flink/metrics/groups/MetricGroupTest.java   |  16 ++-
 .../flink/metrics/groups/OperatorGroupTest.java |   2 +
 .../flink/metrics/groups/TaskGroupTest.java     |  17 +++
 .../metrics/groups/TaskManagerGroupTest.java    |  13 +-
 .../flink/metrics/reporter/JMXReporterTest.java | 141 ++++++++++++++++++-
 flink-dist/src/main/flink-bin/bin/config.sh     |   6 -
 .../src/main/flink-bin/bin/flink-daemon.sh      |   8 +-
 .../io/network/api/reader/BufferReaderTest.java |   2 +
 .../runtime/operators/DataSinkTaskTest.java     |   2 +
 .../runtime/operators/DataSourceTaskTest.java   |   2 +
 .../operators/chaining/ChainTaskTest.java       |   3 +
 .../cassandra/CassandraConnectorTest.java       |   2 +-
 .../api/streamtask/StreamIterationHeadTest.java |   2 +
 .../operators/GenericWriteAheadSinkTest.java    |   2 +-
 .../runtime/operators/StreamTaskTimerTest.java  |   2 +
 .../runtime/tasks/OneInputStreamTaskTest.java   |   2 +
 .../runtime/tasks/SourceStreamTaskTest.java     |   2 +
 .../tasks/StreamTaskAsyncCheckpointTest.java    |   2 +
 .../runtime/tasks/TwoInputStreamTaskTest.java   |   2 +
 28 files changed, 387 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index f97195f..c6cf28e 100644
--- a/LICENSE
+++ b/LICENSE
@@ -303,3 +303,18 @@ Open Font License (OFT) - http://scripts.sil.org/OFL
  - Font Awesome (http://fortawesome.github.io/Font-Awesome/) - Created by Dave Gandy
    -> fonts in "flink-runtime-web/web-dashboard/assets/fonts"
 
+-----------------------------------------------------------------------
+ The ISC License
+-----------------------------------------------------------------------
+ 
+ The Apache Flink project contains code under the ISC license from the following files:
+  - simplejmx (http://256.com/sources/simplejmx/) Copyright (c) - Gray Watson
+
+Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby
+granted, provided that this permission notice appear in all copies.
+
+THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING
+ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE
+USE OR PERFORMANCE OF THIS SOFTWARE.

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
index 8eb8dfe..efdea30 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
@@ -51,6 +51,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -67,6 +68,7 @@ import static org.junit.Assert.assertTrue;
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ResultPartitionWriter.class, FileSystem.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 @SuppressWarnings("serial")
 public class RocksDBAsyncKVSnapshotTest {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
index 5da12ef..2ebb917 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -49,6 +49,7 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -67,6 +68,7 @@ import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({StreamRecordSerializer.class, WrapperSetupHelper.class, StreamRecord.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 public class BoltWrapperTest extends AbstractTest {
 
 	@Test(expected = IllegalArgumentException.class)

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
index 77cf130..e50ff5a 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
@@ -35,6 +35,7 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -52,6 +53,7 @@ import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(WrapperSetupHelper.class)
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 public class SpoutWrapperTest extends AbstractTest {
 
 	@SuppressWarnings({ "rawtypes", "unchecked" })

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
index cdf6f06..f37b547 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
@@ -51,9 +51,9 @@ import java.util.Set;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-@PowerMockIgnore("javax.*")
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(WrapperSetupHelper.class)
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 public class WrapperSetupHelperTest extends AbstractTest {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
index f3402e9..f283ce3 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
@@ -46,6 +46,8 @@ public class MetricRegistry {
 	//  configuration keys
 	// ------------------------------------------------------------------------
 
+	public static final String KEY_METRICS_JMX_PORT = "metrics.jmx.port";
+
 	public static final String KEY_METRICS_REPORTER_CLASS = "metrics.reporter.class";
 	public static final String KEY_METRICS_REPORTER_ARGUMENTS = "metrics.reporter.arguments";
 	public static final String KEY_METRICS_REPORTER_INTERVAL = "metrics.reporter.interval";
@@ -87,7 +89,7 @@ public class MetricRegistry {
 		if (className == null) {
 			// by default, create JMX metrics
 			LOG.info("No metrics reporter configured, exposing metrics via JMX");
-			this.reporter = new JMXReporter();
+			this.reporter = startJmxReporter(config);
 			this.executor = null;
 		}
 		else {
@@ -125,9 +127,9 @@ public class MetricRegistry {
 				}
 			}
 			catch (Throwable t) {
-				reporter = new JMXReporter();
 				shutdownExecutor();
 				LOG.error("Could not instantiate custom metrics reporter. Defaulting to JMX metrics export.", t);
+				reporter = startJmxReporter(config);
 			}
 
 			this.reporter = reporter;
@@ -135,6 +137,23 @@ public class MetricRegistry {
 		}
 	}
 
+	private static JMXReporter startJmxReporter(Configuration config) {
+		JMXReporter reporter = null;
+		try {
+			Configuration reporterConfig = new Configuration();
+			String portRange = config.getString(KEY_METRICS_JMX_PORT, null);
+			if (portRange != null) {
+				reporterConfig.setString(KEY_METRICS_JMX_PORT, portRange);
+			}
+			reporter = new JMXReporter();
+			reporter.open(reporterConfig);
+		} catch (Exception e) {
+			LOG.error("Failed to instantiate JMX reporter.", e);
+		} finally {
+			return reporter;
+		}
+	}
+
 	/**
 	 * Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
index db81164..326d6d7 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
@@ -24,7 +24,7 @@ import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.groups.AbstractMetricGroup;
-
+import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,10 +34,22 @@ import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXConnectorServerFactory;
+import javax.management.remote.JMXServiceURL;
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
+import java.net.MalformedURLException;
+import java.rmi.NoSuchObjectException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 
+import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_JMX_PORT;
+
 /**
  * {@link MetricReporter} that exports {@link Metric Metrics} via JMX.
  *
@@ -60,6 +72,9 @@ public class JMXReporter implements MetricReporter {
 	/** The names under which the registered metrics have been added to the MBeanServer */ 
 	private final Map<Metric, ObjectName> registeredMetrics;
 
+	/** The server to which JMX clients connect to. ALlows for better control over port usage. */
+	private JMXServer jmxServer;
+
 	/**
 	 * Creates a new JMXReporter
 	 */
@@ -73,10 +88,42 @@ public class JMXReporter implements MetricReporter {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void open(Configuration config) {}
+	public void open(Configuration config) {
+		this.jmxServer = startJmxServer(config);
+	}
+
+	private static JMXServer startJmxServer(Configuration config) {
+		Iterator<Integer> ports = NetUtils.getPortRangeFromString(config.getString(KEY_METRICS_JMX_PORT, "9010-9025"));
+
+		JMXServer server = new JMXServer();
+		while (ports.hasNext()) {
+			int port = ports.next();
+			try {
+				server.start(port);
+				LOG.info("Started JMX server on port " + port + ".");
+				return server;
+			} catch (IOException ioe) { //assume port conflict
+				LOG.debug("Could not start JMX server on port " + port + ".", ioe);
+				try {
+					server.stop();
+				} catch (Exception e) {
+					LOG.debug("Could not stop JMX server.", e);
+				}
+			}
+		}
+		throw new RuntimeException("Could not start JMX server on any configured port.");
+	}
 
 	@Override
-	public void close() {}
+	public void close() {
+		if (jmxServer != null) {
+			try {
+				jmxServer.stop();
+			} catch (IOException e) {
+				LOG.error("Failed to stop JMX server.", e);
+			}
+		}
+	}
 
 	// ------------------------------------------------------------------------
 	//  adding / removing metrics
@@ -265,4 +312,74 @@ public class JMXReporter implements MetricReporter {
 			return gauge.getValue();
 		}
 	}
+
+	/**
+	 * JMX Server implementation that JMX clients can connect to.
+	 *
+	 * Heavily based on j256 simplejmx project
+	 *
+	 * https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java
+	 */
+	private static class JMXServer {
+		private Registry rmiRegistry;
+		private JMXConnectorServer connector;
+
+		public void start(int port) throws IOException {
+			if (rmiRegistry != null && connector != null) {
+				LOG.debug("JMXServer is already running.");
+				return;
+			}
+			startRmiRegistry(port);
+			startJmxService(port);
+		}
+
+		/**
+		 * Starts an RMI Registry that allows clients to lookup the JMX IP/port.
+		 *
+		 * @param port rmi port to use
+		 * @throws IOException
+		 */
+		private void startRmiRegistry(int port) throws IOException {
+			rmiRegistry = LocateRegistry.createRegistry(port);
+		}
+
+		/**
+		 * Starts a JMX connector that allows (un)registering MBeans with the MBean server and RMI invocations.
+		 *
+		 * @param port jmx port to use
+		 * @throws IOException
+		 */
+		private void startJmxService(int port) throws IOException {
+			String serviceUrl = "service:jmx:rmi://localhost:" + port + "/jndi/rmi://localhost:" + port + "/jmxrmi";
+			JMXServiceURL url;
+			try {
+				url = new JMXServiceURL(serviceUrl);
+			} catch (MalformedURLException e) {
+				throw new IllegalArgumentException("Malformed service url created " + serviceUrl, e);
+			}
+
+			connector = JMXConnectorServerFactory.newJMXConnectorServer(url, null, ManagementFactory.getPlatformMBeanServer());
+
+			connector.start();
+		}
+
+		public void stop() throws IOException {
+			if (connector != null) {
+				try {
+					connector.stop();
+				} finally {
+					connector = null;
+				}
+			}
+			if (rmiRegistry != null) {
+				try {
+					UnicastRemoteObject.unexportObject(rmiRegistry, true);
+				} catch (NoSuchObjectException e) {
+					throw new IOException("Could not un-export our RMI registry", e);
+				} finally {
+					rmiRegistry = null;
+				}
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java
index e820762..4bcb1ee 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java
@@ -45,6 +45,7 @@ public class JobGroupTest {
 		assertEquals(
 				"theHostName.taskmanager.test-tm-id.myJobName",
 				jmGroup.getScopeString());
+		registry.shutdown();
 	}
 
 	@Test
@@ -66,6 +67,7 @@ public class JobGroupTest {
 		assertEquals(
 				"some-constant.myJobName",
 				jmGroup.getScopeString());
+		registry.shutdown();
 	}
 
 	@Test
@@ -87,5 +89,6 @@ public class JobGroupTest {
 		assertEquals(
 				"peter.test-tm-id.some-constant." + jid,
 				jmGroup.getScopeString());
+		registry.shutdown();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
index 5645b94..7b35d91 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
@@ -39,7 +39,9 @@ public class MetricGroupRegistrationTest {
 		Configuration config = new Configuration();
 		config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestReporter1.class.getName());
 
-		MetricGroup root = new TaskManagerMetricGroup(new MetricRegistry(config), "host", "id");
+		MetricRegistry registry = new MetricRegistry(config);
+
+		MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
 
 		Counter counter = root.counter("counter");
 		assertEquals(counter, TestReporter1.lastPassedMetric);
@@ -54,6 +56,8 @@ public class MetricGroupRegistrationTest {
 		
 		Assert.assertEquals(gauge, TestReporter1.lastPassedMetric);
 		assertEquals("gauge", TestReporter1.lastPassedName);
+
+		registry.shutdown();
 	}
 
 	public static class TestReporter1 extends TestReporter {
@@ -75,8 +79,12 @@ public class MetricGroupRegistrationTest {
 	public void testInvalidMetricName() {
 		Configuration config = new Configuration();
 
-		MetricGroup root = new TaskManagerMetricGroup(new MetricRegistry(config), "host", "id");
+		MetricRegistry registry = new MetricRegistry(config);
+
+		MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
 		root.counter("=)(/!");
+
+		registry.shutdown();
 	}
 
 	/**
@@ -86,7 +94,9 @@ public class MetricGroupRegistrationTest {
 	public void testDuplicateGroupName() {
 		Configuration config = new Configuration();
 
-		MetricGroup root = new TaskManagerMetricGroup(new MetricRegistry(config), "host", "id");
+		MetricRegistry registry = new MetricRegistry(config);
+
+		MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
 
 		MetricGroup group1 = root.addGroup("group");
 		MetricGroup group2 = root.addGroup("group");

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java
index 2849bab..3f8a577 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java
@@ -24,16 +24,30 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.MetricRegistry;
 
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
 
 public class MetricGroupTest {
 	
-	private final MetricRegistry registry = new MetricRegistry(new Configuration());
+	private MetricRegistry registry;
 
 	private final MetricRegistry exceptionOnRegister = new ExceptionOnRegisterRegistry();
 
+	@Before
+	public void createRegistry() {
+		this.registry = new MetricRegistry(new Configuration());
+	}
+
+	@After
+	public void shutdownRegistry() {
+		this.registry.shutdown();
+		this.registry = null;
+	}
+
 	@Test
 	public void sameGroupOnNameCollision() {
 		GenericMetricGroup group = new GenericMetricGroup(

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
index cb5e082..c0c8842 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
@@ -47,5 +47,7 @@ public class OperatorGroupTest {
 		assertEquals(
 				"theHostName.taskmanager.test-tm-id.myJobName.myOpName.11",
 				opGroup.getScopeString());
+
+		registry.shutdown();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
index 4a492d2..88f425b 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
@@ -27,6 +27,8 @@ import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
 import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskScopeFormat;
 import org.apache.flink.util.AbstractID;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -37,6 +39,18 @@ public class TaskGroupTest {
 	// ------------------------------------------------------------------------
 	//  scope tests
 	// ------------------------------------------------------------------------
+	private MetricRegistry registry;
+
+	@Before
+	public void createRegistry() {
+		this.registry = new MetricRegistry(new Configuration());
+	}
+
+	@After
+	public void shutdownRegistry() {
+		this.registry.shutdown();
+		this.registry = null;
+	}
 
 	@Test
 	public void testGenerateScopeDefault() {
@@ -56,6 +70,7 @@ public class TaskGroupTest {
 		assertEquals(
 				"theHostName.taskmanager.test-tm-id.myJobName.aTaskName.13",
 				taskGroup.getScopeString());
+		registry.shutdown();
 	}
 
 	@Test
@@ -82,6 +97,7 @@ public class TaskGroupTest {
 		assertEquals(
 				String.format("test-tm-id.%s.%s.%s", jid, vertexId, executionId),
 				taskGroup.getScopeString());
+		registry.shutdown();
 	}
 
 	@Test
@@ -110,5 +126,6 @@ public class TaskGroupTest {
 		assertEquals(
 				"theHostName.taskmanager.test-tm-id.myJobName." + executionId + ".13",
 				taskGroup.getScopeString());
+		registry.shutdown();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
index 9adc1be..9866b1b 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
@@ -36,8 +36,10 @@ public class TaskManagerGroupTest {
 
 	@Test
 	public void addAndRemoveJobs() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+
 		final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
-				new MetricRegistry(new Configuration()), "localhost", new AbstractID().toString());
+				registry, "localhost", new AbstractID().toString());
 		
 		
 		final JobID jid1 = new JobID();
@@ -87,12 +89,15 @@ public class TaskManagerGroupTest {
 		assertTrue(tmGroup13.parent().isClosed());
 		
 		assertEquals(0, group.numRegisteredJobMetricGroups());
+
+		registry.shutdown();
 	}
 
 	@Test
 	public void testCloseClosesAll() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
 		final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
-				new MetricRegistry(new Configuration()), "localhost", new AbstractID().toString());
+				registry, "localhost", new AbstractID().toString());
 
 
 		final JobID jid1 = new JobID();
@@ -118,6 +123,8 @@ public class TaskManagerGroupTest {
 		assertTrue(tmGroup11.isClosed());
 		assertTrue(tmGroup12.isClosed());
 		assertTrue(tmGroup21.isClosed());
+
+		registry.shutdown();
 	}
 	
 	// ------------------------------------------------------------------------
@@ -131,6 +138,7 @@ public class TaskManagerGroupTest {
 
 		assertArrayEquals(new String[] { "localhost", "taskmanager", "id" }, group.getScopeComponents());
 		assertEquals("localhost.taskmanager.id", group.getScopeString());
+		registry.shutdown();
 	}
 
 	@Test
@@ -141,5 +149,6 @@ public class TaskManagerGroupTest {
 
 		assertArrayEquals(new String[] { "constant", "host", "foo", "host" }, group.getScopeComponents());
 		assertEquals("constant.host.foo.host", group.getScopeString());
+		registry.shutdown();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
index abe1669..d25f744 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
@@ -18,9 +18,24 @@
 
 package org.apache.flink.metrics.reporter;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.metrics.util.TestReporter;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import java.lang.management.ManagementFactory;
+
+import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_JMX_PORT;
+import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_REPORTER_CLASS;
+import static org.junit.Assert.assertEquals;
 
 public class JMXReporterTest {
 
@@ -46,9 +61,131 @@ public class JMXReporterTest {
 	 */
 	@Test
 	public void testGenerateName() {
-		String[] scope = { "value0", "value1", "\"value2 (test),=;:?'" };
+		String[] scope = {"value0", "value1", "\"value2 (test),=;:?'"};
 		String jmxName = JMXReporter.generateJmxName("TestMetric", scope);
 
 		assertEquals("org.apache.flink.metrics:key0=value0,key1=value1,key2=value2_(test)------,name=TestMetric", jmxName);
 	}
+
+	/**
+	 * Verifies that multiple JMXReporters can be started on the same machine and register metrics at the MBeanServer.
+	 *
+	 * @throws Exception if the attribute/mbean could not be found or the test is broken
+	 */
+	@Test
+	public void testPortConflictHandling() throws Exception {
+		Configuration cfg = new Configuration();
+		cfg.setString(KEY_METRICS_REPORTER_CLASS, TestReporter.class.getName());
+		MetricRegistry reg = new MetricRegistry(cfg);
+
+		TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm");
+
+		JMXReporter rep1 = new JMXReporter();
+		JMXReporter rep2 = new JMXReporter();
+
+		Configuration cfg1 = new Configuration();
+		Configuration cfg2 = new Configuration();
+
+		rep1.open(cfg1);
+		rep2.open(cfg2);
+
+		rep1.notifyOfAddedMetric(new Gauge<Integer>() {
+			@Override
+			public Integer getValue() {
+				return 1;
+			}
+		}, "rep1", new TaskManagerMetricGroup(reg, "host", "tm"));
+
+		rep2.notifyOfAddedMetric(new Gauge<Integer>() {
+			@Override
+			public Integer getValue() {
+				return 2;
+			}
+		}, "rep2", new TaskManagerMetricGroup(reg, "host", "tm"));
+
+		MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+
+		ObjectName objectName1 = new ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents()));
+		ObjectName objectName2 = new ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents()));
+
+		assertEquals(1, mBeanServer.getAttribute(objectName1, "Value"));
+		assertEquals(2, mBeanServer.getAttribute(objectName2, "Value"));
+
+		rep1.close();
+		rep2.close();
+		reg.shutdown();
+	}
+
+	/**
+	 * Verifies that we can connect to multiple JMXReporters running on the same machine.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testJMXAvailability() throws Exception {
+		Configuration cfg = new Configuration();
+		cfg.setString(KEY_METRICS_REPORTER_CLASS, TestReporter.class.getName());
+		MetricRegistry reg = new MetricRegistry(cfg);
+
+		TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm");
+
+		JMXReporter rep1 = new JMXReporter();
+		JMXReporter rep2 = new JMXReporter();
+
+		int port1 = 9010;
+		int port2 = 9011;
+
+		Configuration cfg1 = new Configuration();
+		cfg1.setString(KEY_METRICS_JMX_PORT, String.valueOf(port1));
+		Configuration cfg2 = new Configuration();
+		cfg2.setString(KEY_METRICS_JMX_PORT, String.valueOf(port2));
+
+		rep1.open(cfg1);
+		rep2.open(cfg2);
+
+		rep1.notifyOfAddedMetric(new Gauge<Integer>() {
+			@Override
+			public Integer getValue() {
+				return 1;
+			}
+		}, "rep1", new TaskManagerMetricGroup(reg, "host", "tm"));
+
+		rep2.notifyOfAddedMetric(new Gauge<Integer>() {
+			@Override
+			public Integer getValue() {
+				return 2;
+			}
+		}, "rep2", new TaskManagerMetricGroup(reg, "host", "tm"));
+
+		ObjectName objectName1 = new ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents()));
+		ObjectName objectName2 = new ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents()));
+
+		JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + port1 + "/jndi/rmi://localhost:" + port1 + "/jmxrmi");
+		JMXConnector jmxCon1 = JMXConnectorFactory.connect(url1);
+		MBeanServerConnection mCon1 = jmxCon1.getMBeanServerConnection();
+
+		assertEquals(1, mCon1.getAttribute(objectName1, "Value"));
+		assertEquals(2, mCon1.getAttribute(objectName2, "Value"));
+
+		url1 = null;
+		jmxCon1.close();
+		jmxCon1 = null;
+		mCon1 = null;
+
+		JMXServiceURL url2 = new JMXServiceURL("service:jmx:rmi://localhost:" + port2 + "/jndi/rmi://localhost:" + port2 + "/jmxrmi");
+		JMXConnector jmxCon2 = JMXConnectorFactory.connect(url2);
+		MBeanServerConnection mCon2 = jmxCon2.getMBeanServerConnection();
+
+		assertEquals(1, mCon2.getAttribute(objectName1, "Value"));
+		assertEquals(2, mCon2.getAttribute(objectName2, "Value"));
+
+		url2 = null;
+		jmxCon2.close();
+		jmxCon2 = null;
+		mCon2 = null;
+
+		rep1.close();
+		rep2.close();
+		reg.shutdown();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 96b3122..e57e6f2 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -107,8 +107,6 @@ KEY_ENV_SSH_OPTS="env.ssh.opts"
 KEY_RECOVERY_MODE="recovery.mode"
 KEY_ZK_HEAP_MB="zookeeper.heap.mb"
 
-KEY_METRICS_JMX_PORT="metrics.jmx.port"
-
 ########################################################################################################################
 # PATHS AND CONFIG
 ########################################################################################################################
@@ -258,10 +256,6 @@ if [ -z "${RECOVERY_MODE}" ]; then
     RECOVERY_MODE=$(readFromConfig ${KEY_RECOVERY_MODE} "standalone" "${YAML_CONF}")
 fi
 
-if [ -z "${JMX_PORT}" ]; then
-    JMX_PORT=$(readFromConfig ${KEY_METRICS_JMX_PORT} 9010 "${YAML_CONF}")
-fi
-
 # Arguments for the JVM. Used for job and task manager JVMs.
 # DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
 # KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that!

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
index cc7163f..5cd9f60 100644
--- a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
+++ b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
@@ -23,7 +23,6 @@ USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (jobmanager|taskmanager|zook
 STARTSTOP=$1
 DAEMON=$2
 ARGS=("${@:3}") # get remaining arguments as array
-JMX_ARGS=""
 
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
@@ -33,14 +32,10 @@ bin=`cd "$bin"; pwd`
 case $DAEMON in
     (jobmanager)
         CLASS_TO_RUN=org.apache.flink.runtime.jobmanager.JobManager
-        if [ "${ARGS[3]}" == "local" ]; then
-            JMX_ARGS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=${JMX_PORT} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
-        fi
     ;;
 
     (taskmanager)
         CLASS_TO_RUN=org.apache.flink.runtime.taskmanager.TaskManager
-        JMX_ARGS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=${JMX_PORT} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
     ;;
 
     (zookeeper)
@@ -101,13 +96,12 @@ case $STARTSTOP in
           count="${#active[@]}"
 
           if [ ${count} -gt 0 ]; then
-            JMX_ARGS=""
             echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME."
           fi
         fi
 
         echo "Starting $DAEMON daemon on host $HOSTNAME."
-        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} ${JMX_ARGS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &
+        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &
 
         mypid=$!
 

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
index 8519ac6..099b6fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -39,6 +40,7 @@ import static org.mockito.Mockito.verify;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(Task.class)
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 @SuppressWarnings("unchecked")
 public class BufferReaderTest {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index 386634f..a41c25b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -37,6 +37,7 @@ import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -57,6 +58,7 @@ import static org.junit.Assert.assertFalse;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({Task.class, ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 public class DataSinkTaskTest extends TaskTestBase {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(DataSinkTaskTest.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
index 8f0642e..95f2991 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
@@ -44,11 +44,13 @@ import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({Task.class, ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 public class DataSourceTaskTest extends TaskTestBase {
 
 	private static final int MEMORY_MANAGER_SIZE = 1024 * 1024;

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index c3c23de..02c420c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -46,11 +46,14 @@ import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({Task.class, ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+
 public class ChainTaskTest extends TaskTestBase {
 	
 	private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3;

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
index 83bb37a..8d0c02e 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
@@ -66,7 +66,7 @@ import java.util.UUID;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(ResultPartitionWriter.class)
-@PowerMockIgnore("javax.management.*")
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
 	private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorTest.class);
 	private static File tmpDir;

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
index 8f5f8df..a047ed4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -31,6 +32,7 @@ import static org.junit.Assert.*;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ ResultPartitionWriter.class })
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 public class StreamIterationHeadTest {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
index 615fabb..8282672 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
@@ -35,7 +35,7 @@ import java.util.List;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(ResultPartitionWriter.class)
-@PowerMockIgnore("javax.management.*")
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Integer>, GenericWriteAheadSinkTest.ListSink> {
 	@Override
 	protected ListSink createSink() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index a3b8b90..153b1fd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -42,6 +43,7 @@ import static org.junit.Assert.*;
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(ResultPartitionWriter.class)
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 @SuppressWarnings("serial")
 public class StreamTaskTimerTest {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index cd052a3..5fcc59e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -49,6 +50,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 public class OneInputStreamTaskTest {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index cb779b0..1e62e28 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -34,6 +34,7 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -53,6 +54,7 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 public class SourceStreamTaskTest {
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
index ed1dd60..0e1da88 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -46,6 +47,7 @@ import static org.junit.Assert.assertTrue;
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(ResultPartitionWriter.class)
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 @SuppressWarnings("serial")
 public class StreamTaskAsyncCheckpointTest {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 7936780..b9211b1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -50,6 +51,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 public class TwoInputStreamTaskTest {
 
 	/**