You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/06/27 11:17:07 UTC
flink git commit: [FLINK-4087] [metrics] Improved JMX port handling
Repository: flink
Updated Branches:
refs/heads/master 19ff8db68 -> 62cb954d9
[FLINK-4087] [metrics] Improved JMX port handling
This closes #2145
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62cb954d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62cb954d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62cb954d
Branch: refs/heads/master
Commit: 62cb954d989e3ed36f6d9ab7daeff93cda64adbb
Parents: 19ff8db
Author: zentol <ch...@apache.org>
Authored: Fri Jun 17 15:47:53 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jun 27 13:14:27 2016 +0200
----------------------------------------------------------------------
LICENSE | 15 ++
.../state/RocksDBAsyncKVSnapshotTest.java | 2 +
.../flink/storm/wrappers/BoltWrapperTest.java | 2 +
.../flink/storm/wrappers/SpoutWrapperTest.java | 2 +
.../storm/wrappers/WrapperSetupHelperTest.java | 2 +-
.../apache/flink/metrics/MetricRegistry.java | 23 ++-
.../flink/metrics/reporter/JMXReporter.java | 123 +++++++++++++++-
.../flink/metrics/groups/JobGroupTest.java | 3 +
.../groups/MetricGroupRegistrationTest.java | 16 ++-
.../flink/metrics/groups/MetricGroupTest.java | 16 ++-
.../flink/metrics/groups/OperatorGroupTest.java | 2 +
.../flink/metrics/groups/TaskGroupTest.java | 17 +++
.../metrics/groups/TaskManagerGroupTest.java | 13 +-
.../flink/metrics/reporter/JMXReporterTest.java | 141 ++++++++++++++++++-
flink-dist/src/main/flink-bin/bin/config.sh | 6 -
.../src/main/flink-bin/bin/flink-daemon.sh | 8 +-
.../io/network/api/reader/BufferReaderTest.java | 2 +
.../runtime/operators/DataSinkTaskTest.java | 2 +
.../runtime/operators/DataSourceTaskTest.java | 2 +
.../operators/chaining/ChainTaskTest.java | 3 +
.../cassandra/CassandraConnectorTest.java | 2 +-
.../api/streamtask/StreamIterationHeadTest.java | 2 +
.../operators/GenericWriteAheadSinkTest.java | 2 +-
.../runtime/operators/StreamTaskTimerTest.java | 2 +
.../runtime/tasks/OneInputStreamTaskTest.java | 2 +
.../runtime/tasks/SourceStreamTaskTest.java | 2 +
.../tasks/StreamTaskAsyncCheckpointTest.java | 2 +
.../runtime/tasks/TwoInputStreamTaskTest.java | 2 +
28 files changed, 387 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index f97195f..c6cf28e 100644
--- a/LICENSE
+++ b/LICENSE
@@ -303,3 +303,18 @@ Open Font License (OFT) - http://scripts.sil.org/OFL
- Font Awesome (http://fortawesome.github.io/Font-Awesome/) - Created by Dave Gandy
-> fonts in "flink-runtime-web/web-dashboard/assets/fonts"
+-----------------------------------------------------------------------
+ The ISC License
+-----------------------------------------------------------------------
+
+ The Apache Flink project contains code under the ISC license from the following files:
+ - simplejmx (http://256.com/sources/simplejmx/) Copyright (c) - Gray Watson
+
+Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby
+granted, provided that this permission notice appear in all copies.
+
+THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING
+ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE
+USE OR PERFORMANCE OF THIS SOFTWARE.
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
index 8eb8dfe..efdea30 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
@@ -51,6 +51,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -67,6 +68,7 @@ import static org.junit.Assert.assertTrue;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({ResultPartitionWriter.class, FileSystem.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
@SuppressWarnings("serial")
public class RocksDBAsyncKVSnapshotTest {
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
index 5da12ef..2ebb917 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -49,6 +49,7 @@ import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -67,6 +68,7 @@ import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest({StreamRecordSerializer.class, WrapperSetupHelper.class, StreamRecord.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class BoltWrapperTest extends AbstractTest {
@Test(expected = IllegalArgumentException.class)
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
index 77cf130..e50ff5a 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
@@ -35,6 +35,7 @@ import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -52,6 +53,7 @@ import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest(WrapperSetupHelper.class)
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class SpoutWrapperTest extends AbstractTest {
@SuppressWarnings({ "rawtypes", "unchecked" })
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
index cdf6f06..f37b547 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
@@ -51,9 +51,9 @@ import java.util.Set;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-@PowerMockIgnore("javax.*")
@RunWith(PowerMockRunner.class)
@PrepareForTest(WrapperSetupHelper.class)
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class WrapperSetupHelperTest extends AbstractTest {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
index f3402e9..f283ce3 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
@@ -46,6 +46,8 @@ public class MetricRegistry {
// configuration keys
// ------------------------------------------------------------------------
+ public static final String KEY_METRICS_JMX_PORT = "metrics.jmx.port";
+
public static final String KEY_METRICS_REPORTER_CLASS = "metrics.reporter.class";
public static final String KEY_METRICS_REPORTER_ARGUMENTS = "metrics.reporter.arguments";
public static final String KEY_METRICS_REPORTER_INTERVAL = "metrics.reporter.interval";
@@ -87,7 +89,7 @@ public class MetricRegistry {
if (className == null) {
// by default, create JMX metrics
LOG.info("No metrics reporter configured, exposing metrics via JMX");
- this.reporter = new JMXReporter();
+ this.reporter = startJmxReporter(config);
this.executor = null;
}
else {
@@ -125,9 +127,9 @@ public class MetricRegistry {
}
}
catch (Throwable t) {
- reporter = new JMXReporter();
shutdownExecutor();
LOG.error("Could not instantiate custom metrics reporter. Defaulting to JMX metrics export.", t);
+ reporter = startJmxReporter(config);
}
this.reporter = reporter;
@@ -135,6 +137,23 @@ public class MetricRegistry {
}
}
+ private static JMXReporter startJmxReporter(Configuration config) {
+ JMXReporter reporter = null;
+ try {
+ Configuration reporterConfig = new Configuration();
+ String portRange = config.getString(KEY_METRICS_JMX_PORT, null);
+ if (portRange != null) {
+ reporterConfig.setString(KEY_METRICS_JMX_PORT, portRange);
+ }
+ reporter = new JMXReporter();
+ reporter.open(reporterConfig);
+ } catch (Exception e) {
+ LOG.error("Failed to instantiate JMX reporter.", e);
+ } finally {
+ return reporter;
+ }
+ }
+
/**
* Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
index db81164..326d6d7 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
@@ -24,7 +24,7 @@ import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.AbstractMetricGroup;
-
+import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,10 +34,22 @@ import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXConnectorServerFactory;
+import javax.management.remote.JMXServiceURL;
+import java.io.IOException;
import java.lang.management.ManagementFactory;
+import java.net.MalformedURLException;
+import java.rmi.NoSuchObjectException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
+import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_JMX_PORT;
+
/**
* {@link MetricReporter} that exports {@link Metric Metrics} via JMX.
*
@@ -60,6 +72,9 @@ public class JMXReporter implements MetricReporter {
/** The names under which the registered metrics have been added to the MBeanServer */
private final Map<Metric, ObjectName> registeredMetrics;
+ /** The server to which JMX clients connect to. ALlows for better control over port usage. */
+ private JMXServer jmxServer;
+
/**
* Creates a new JMXReporter
*/
@@ -73,10 +88,42 @@ public class JMXReporter implements MetricReporter {
// ------------------------------------------------------------------------
@Override
- public void open(Configuration config) {}
+ public void open(Configuration config) {
+ this.jmxServer = startJmxServer(config);
+ }
+
+ private static JMXServer startJmxServer(Configuration config) {
+ Iterator<Integer> ports = NetUtils.getPortRangeFromString(config.getString(KEY_METRICS_JMX_PORT, "9010-9025"));
+
+ JMXServer server = new JMXServer();
+ while (ports.hasNext()) {
+ int port = ports.next();
+ try {
+ server.start(port);
+ LOG.info("Started JMX server on port " + port + ".");
+ return server;
+ } catch (IOException ioe) { //assume port conflict
+ LOG.debug("Could not start JMX server on port " + port + ".", ioe);
+ try {
+ server.stop();
+ } catch (Exception e) {
+ LOG.debug("Could not stop JMX server.", e);
+ }
+ }
+ }
+ throw new RuntimeException("Could not start JMX server on any configured port.");
+ }
@Override
- public void close() {}
+ public void close() {
+ if (jmxServer != null) {
+ try {
+ jmxServer.stop();
+ } catch (IOException e) {
+ LOG.error("Failed to stop JMX server.", e);
+ }
+ }
+ }
// ------------------------------------------------------------------------
// adding / removing metrics
@@ -265,4 +312,74 @@ public class JMXReporter implements MetricReporter {
return gauge.getValue();
}
}
+
+ /**
+ * JMX Server implementation that JMX clients can connect to.
+ *
+ * Heavily based on j256 simplejmx project
+ *
+ * https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java
+ */
+ private static class JMXServer {
+ private Registry rmiRegistry;
+ private JMXConnectorServer connector;
+
+ public void start(int port) throws IOException {
+ if (rmiRegistry != null && connector != null) {
+ LOG.debug("JMXServer is already running.");
+ return;
+ }
+ startRmiRegistry(port);
+ startJmxService(port);
+ }
+
+ /**
+ * Starts an RMI Registry that allows clients to lookup the JMX IP/port.
+ *
+ * @param port rmi port to use
+ * @throws IOException
+ */
+ private void startRmiRegistry(int port) throws IOException {
+ rmiRegistry = LocateRegistry.createRegistry(port);
+ }
+
+ /**
+ * Starts a JMX connector that allows (un)registering MBeans with the MBean server and RMI invocations.
+ *
+ * @param port jmx port to use
+ * @throws IOException
+ */
+ private void startJmxService(int port) throws IOException {
+ String serviceUrl = "service:jmx:rmi://localhost:" + port + "/jndi/rmi://localhost:" + port + "/jmxrmi";
+ JMXServiceURL url;
+ try {
+ url = new JMXServiceURL(serviceUrl);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException("Malformed service url created " + serviceUrl, e);
+ }
+
+ connector = JMXConnectorServerFactory.newJMXConnectorServer(url, null, ManagementFactory.getPlatformMBeanServer());
+
+ connector.start();
+ }
+
+ public void stop() throws IOException {
+ if (connector != null) {
+ try {
+ connector.stop();
+ } finally {
+ connector = null;
+ }
+ }
+ if (rmiRegistry != null) {
+ try {
+ UnicastRemoteObject.unexportObject(rmiRegistry, true);
+ } catch (NoSuchObjectException e) {
+ throw new IOException("Could not un-export our RMI registry", e);
+ } finally {
+ rmiRegistry = null;
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java
index e820762..4bcb1ee 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java
@@ -45,6 +45,7 @@ public class JobGroupTest {
assertEquals(
"theHostName.taskmanager.test-tm-id.myJobName",
jmGroup.getScopeString());
+ registry.shutdown();
}
@Test
@@ -66,6 +67,7 @@ public class JobGroupTest {
assertEquals(
"some-constant.myJobName",
jmGroup.getScopeString());
+ registry.shutdown();
}
@Test
@@ -87,5 +89,6 @@ public class JobGroupTest {
assertEquals(
"peter.test-tm-id.some-constant." + jid,
jmGroup.getScopeString());
+ registry.shutdown();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
index 5645b94..7b35d91 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
@@ -39,7 +39,9 @@ public class MetricGroupRegistrationTest {
Configuration config = new Configuration();
config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestReporter1.class.getName());
- MetricGroup root = new TaskManagerMetricGroup(new MetricRegistry(config), "host", "id");
+ MetricRegistry registry = new MetricRegistry(config);
+
+ MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
Counter counter = root.counter("counter");
assertEquals(counter, TestReporter1.lastPassedMetric);
@@ -54,6 +56,8 @@ public class MetricGroupRegistrationTest {
Assert.assertEquals(gauge, TestReporter1.lastPassedMetric);
assertEquals("gauge", TestReporter1.lastPassedName);
+
+ registry.shutdown();
}
public static class TestReporter1 extends TestReporter {
@@ -75,8 +79,12 @@ public class MetricGroupRegistrationTest {
public void testInvalidMetricName() {
Configuration config = new Configuration();
- MetricGroup root = new TaskManagerMetricGroup(new MetricRegistry(config), "host", "id");
+ MetricRegistry registry = new MetricRegistry(config);
+
+ MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
root.counter("=)(/!");
+
+ registry.shutdown();
}
/**
@@ -86,7 +94,9 @@ public class MetricGroupRegistrationTest {
public void testDuplicateGroupName() {
Configuration config = new Configuration();
- MetricGroup root = new TaskManagerMetricGroup(new MetricRegistry(config), "host", "id");
+ MetricRegistry registry = new MetricRegistry(config);
+
+ MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
MetricGroup group1 = root.addGroup("group");
MetricGroup group2 = root.addGroup("group");
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java
index 2849bab..3f8a577 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java
@@ -24,16 +24,30 @@ import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.MetricRegistry;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
public class MetricGroupTest {
- private final MetricRegistry registry = new MetricRegistry(new Configuration());
+ private MetricRegistry registry;
private final MetricRegistry exceptionOnRegister = new ExceptionOnRegisterRegistry();
+ @Before
+ public void createRegistry() {
+ this.registry = new MetricRegistry(new Configuration());
+ }
+
+ @After
+ public void shutdownRegistry() {
+ this.registry.shutdown();
+ this.registry = null;
+ }
+
@Test
public void sameGroupOnNameCollision() {
GenericMetricGroup group = new GenericMetricGroup(
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
index cb5e082..c0c8842 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
@@ -47,5 +47,7 @@ public class OperatorGroupTest {
assertEquals(
"theHostName.taskmanager.test-tm-id.myJobName.myOpName.11",
opGroup.getScopeString());
+
+ registry.shutdown();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
index 4a492d2..88f425b 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
@@ -27,6 +27,8 @@ import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskScopeFormat;
import org.apache.flink.util.AbstractID;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
@@ -37,6 +39,18 @@ public class TaskGroupTest {
// ------------------------------------------------------------------------
// scope tests
// ------------------------------------------------------------------------
+ private MetricRegistry registry;
+
+ @Before
+ public void createRegistry() {
+ this.registry = new MetricRegistry(new Configuration());
+ }
+
+ @After
+ public void shutdownRegistry() {
+ this.registry.shutdown();
+ this.registry = null;
+ }
@Test
public void testGenerateScopeDefault() {
@@ -56,6 +70,7 @@ public class TaskGroupTest {
assertEquals(
"theHostName.taskmanager.test-tm-id.myJobName.aTaskName.13",
taskGroup.getScopeString());
+ registry.shutdown();
}
@Test
@@ -82,6 +97,7 @@ public class TaskGroupTest {
assertEquals(
String.format("test-tm-id.%s.%s.%s", jid, vertexId, executionId),
taskGroup.getScopeString());
+ registry.shutdown();
}
@Test
@@ -110,5 +126,6 @@ public class TaskGroupTest {
assertEquals(
"theHostName.taskmanager.test-tm-id.myJobName." + executionId + ".13",
taskGroup.getScopeString());
+ registry.shutdown();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
index 9adc1be..9866b1b 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
@@ -36,8 +36,10 @@ public class TaskManagerGroupTest {
@Test
public void addAndRemoveJobs() {
+ MetricRegistry registry = new MetricRegistry(new Configuration());
+
final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
- new MetricRegistry(new Configuration()), "localhost", new AbstractID().toString());
+ registry, "localhost", new AbstractID().toString());
final JobID jid1 = new JobID();
@@ -87,12 +89,15 @@ public class TaskManagerGroupTest {
assertTrue(tmGroup13.parent().isClosed());
assertEquals(0, group.numRegisteredJobMetricGroups());
+
+ registry.shutdown();
}
@Test
public void testCloseClosesAll() {
+ MetricRegistry registry = new MetricRegistry(new Configuration());
final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
- new MetricRegistry(new Configuration()), "localhost", new AbstractID().toString());
+ registry, "localhost", new AbstractID().toString());
final JobID jid1 = new JobID();
@@ -118,6 +123,8 @@ public class TaskManagerGroupTest {
assertTrue(tmGroup11.isClosed());
assertTrue(tmGroup12.isClosed());
assertTrue(tmGroup21.isClosed());
+
+ registry.shutdown();
}
// ------------------------------------------------------------------------
@@ -131,6 +138,7 @@ public class TaskManagerGroupTest {
assertArrayEquals(new String[] { "localhost", "taskmanager", "id" }, group.getScopeComponents());
assertEquals("localhost.taskmanager.id", group.getScopeString());
+ registry.shutdown();
}
@Test
@@ -141,5 +149,6 @@ public class TaskManagerGroupTest {
assertArrayEquals(new String[] { "constant", "host", "foo", "host" }, group.getScopeComponents());
assertEquals("constant.host.foo.host", group.getScopeString());
+ registry.shutdown();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
index abe1669..d25f744 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
@@ -18,9 +18,24 @@
package org.apache.flink.metrics.reporter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.metrics.util.TestReporter;
import org.junit.Test;
-import static org.junit.Assert.*;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import java.lang.management.ManagementFactory;
+
+import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_JMX_PORT;
+import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_REPORTER_CLASS;
+import static org.junit.Assert.assertEquals;
public class JMXReporterTest {
@@ -46,9 +61,131 @@ public class JMXReporterTest {
*/
@Test
public void testGenerateName() {
- String[] scope = { "value0", "value1", "\"value2 (test),=;:?'" };
+ String[] scope = {"value0", "value1", "\"value2 (test),=;:?'"};
String jmxName = JMXReporter.generateJmxName("TestMetric", scope);
assertEquals("org.apache.flink.metrics:key0=value0,key1=value1,key2=value2_(test)------,name=TestMetric", jmxName);
}
+
+ /**
+ * Verifies that multiple JMXReporters can be started on the same machine and register metrics at the MBeanServer.
+ *
+ * @throws Exception if the attribute/mbean could not be found or the test is broken
+ */
+ @Test
+ public void testPortConflictHandling() throws Exception {
+ Configuration cfg = new Configuration();
+ cfg.setString(KEY_METRICS_REPORTER_CLASS, TestReporter.class.getName());
+ MetricRegistry reg = new MetricRegistry(cfg);
+
+ TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm");
+
+ JMXReporter rep1 = new JMXReporter();
+ JMXReporter rep2 = new JMXReporter();
+
+ Configuration cfg1 = new Configuration();
+ Configuration cfg2 = new Configuration();
+
+ rep1.open(cfg1);
+ rep2.open(cfg2);
+
+ rep1.notifyOfAddedMetric(new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return 1;
+ }
+ }, "rep1", new TaskManagerMetricGroup(reg, "host", "tm"));
+
+ rep2.notifyOfAddedMetric(new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return 2;
+ }
+ }, "rep2", new TaskManagerMetricGroup(reg, "host", "tm"));
+
+ MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+
+ ObjectName objectName1 = new ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents()));
+ ObjectName objectName2 = new ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents()));
+
+ assertEquals(1, mBeanServer.getAttribute(objectName1, "Value"));
+ assertEquals(2, mBeanServer.getAttribute(objectName2, "Value"));
+
+ rep1.close();
+ rep2.close();
+ reg.shutdown();
+ }
+
+ /**
+ * Verifies that we can connect to multiple JMXReporters running on the same machine.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testJMXAvailability() throws Exception {
+ Configuration cfg = new Configuration();
+ cfg.setString(KEY_METRICS_REPORTER_CLASS, TestReporter.class.getName());
+ MetricRegistry reg = new MetricRegistry(cfg);
+
+ TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm");
+
+ JMXReporter rep1 = new JMXReporter();
+ JMXReporter rep2 = new JMXReporter();
+
+ int port1 = 9010;
+ int port2 = 9011;
+
+ Configuration cfg1 = new Configuration();
+ cfg1.setString(KEY_METRICS_JMX_PORT, String.valueOf(port1));
+ Configuration cfg2 = new Configuration();
+ cfg2.setString(KEY_METRICS_JMX_PORT, String.valueOf(port2));
+
+ rep1.open(cfg1);
+ rep2.open(cfg2);
+
+ rep1.notifyOfAddedMetric(new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return 1;
+ }
+ }, "rep1", new TaskManagerMetricGroup(reg, "host", "tm"));
+
+ rep2.notifyOfAddedMetric(new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return 2;
+ }
+ }, "rep2", new TaskManagerMetricGroup(reg, "host", "tm"));
+
+ ObjectName objectName1 = new ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents()));
+ ObjectName objectName2 = new ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents()));
+
+ JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + port1 + "/jndi/rmi://localhost:" + port1 + "/jmxrmi");
+ JMXConnector jmxCon1 = JMXConnectorFactory.connect(url1);
+ MBeanServerConnection mCon1 = jmxCon1.getMBeanServerConnection();
+
+ assertEquals(1, mCon1.getAttribute(objectName1, "Value"));
+ assertEquals(2, mCon1.getAttribute(objectName2, "Value"));
+
+ url1 = null;
+ jmxCon1.close();
+ jmxCon1 = null;
+ mCon1 = null;
+
+ JMXServiceURL url2 = new JMXServiceURL("service:jmx:rmi://localhost:" + port2 + "/jndi/rmi://localhost:" + port2 + "/jmxrmi");
+ JMXConnector jmxCon2 = JMXConnectorFactory.connect(url2);
+ MBeanServerConnection mCon2 = jmxCon2.getMBeanServerConnection();
+
+ assertEquals(1, mCon2.getAttribute(objectName1, "Value"));
+ assertEquals(2, mCon2.getAttribute(objectName2, "Value"));
+
+ url2 = null;
+ jmxCon2.close();
+ jmxCon2 = null;
+ mCon2 = null;
+
+ rep1.close();
+ rep2.close();
+ reg.shutdown();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 96b3122..e57e6f2 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -107,8 +107,6 @@ KEY_ENV_SSH_OPTS="env.ssh.opts"
KEY_RECOVERY_MODE="recovery.mode"
KEY_ZK_HEAP_MB="zookeeper.heap.mb"
-KEY_METRICS_JMX_PORT="metrics.jmx.port"
-
########################################################################################################################
# PATHS AND CONFIG
########################################################################################################################
@@ -258,10 +256,6 @@ if [ -z "${RECOVERY_MODE}" ]; then
RECOVERY_MODE=$(readFromConfig ${KEY_RECOVERY_MODE} "standalone" "${YAML_CONF}")
fi
-if [ -z "${JMX_PORT}" ]; then
- JMX_PORT=$(readFromConfig ${KEY_METRICS_JMX_PORT} 9010 "${YAML_CONF}")
-fi
-
# Arguments for the JVM. Used for job and task manager JVMs.
# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
# KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that!
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
index cc7163f..5cd9f60 100644
--- a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
+++ b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
@@ -23,7 +23,6 @@ USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (jobmanager|taskmanager|zook
STARTSTOP=$1
DAEMON=$2
ARGS=("${@:3}") # get remaining arguments as array
-JMX_ARGS=""
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
@@ -33,14 +32,10 @@ bin=`cd "$bin"; pwd`
case $DAEMON in
(jobmanager)
CLASS_TO_RUN=org.apache.flink.runtime.jobmanager.JobManager
- if [ "${ARGS[3]}" == "local" ]; then
- JMX_ARGS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=${JMX_PORT} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
- fi
;;
(taskmanager)
CLASS_TO_RUN=org.apache.flink.runtime.taskmanager.TaskManager
- JMX_ARGS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=${JMX_PORT} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
;;
(zookeeper)
@@ -101,13 +96,12 @@ case $STARTSTOP in
count="${#active[@]}"
if [ ${count} -gt 0 ]; then
- JMX_ARGS=""
echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME."
fi
fi
echo "Starting $DAEMON daemon on host $HOSTNAME."
- $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} ${JMX_ARGS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &
+ $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &
mypid=$!
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
index 8519ac6..099b6fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.util.event.EventListener;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -39,6 +40,7 @@ import static org.mockito.Mockito.verify;
@RunWith(PowerMockRunner.class)
@PrepareForTest(Task.class)
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
@SuppressWarnings("unchecked")
public class BufferReaderTest {
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index 386634f..a41c25b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -37,6 +37,7 @@ import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -57,6 +58,7 @@ import static org.junit.Assert.assertFalse;
@RunWith(PowerMockRunner.class)
@PrepareForTest({Task.class, ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class DataSinkTaskTest extends TaskTestBase {
private static final Logger LOG = LoggerFactory.getLogger(DataSinkTaskTest.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
index 8f0642e..95f2991 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
@@ -44,11 +44,13 @@ import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest({Task.class, ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class DataSourceTaskTest extends TaskTestBase {
private static final int MEMORY_MANAGER_SIZE = 1024 * 1024;
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index c3c23de..02c420c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -46,11 +46,14 @@ import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest({Task.class, ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+
public class ChainTaskTest extends TaskTestBase {
private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3;
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
index 83bb37a..8d0c02e 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
@@ -66,7 +66,7 @@ import java.util.UUID;
@RunWith(PowerMockRunner.class)
@PrepareForTest(ResultPartitionWriter.class)
-@PowerMockIgnore("javax.management.*")
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorTest.class);
private static File tmpDir;
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
index 8f5f8df..a047ed4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -31,6 +32,7 @@ import static org.junit.Assert.*;
@RunWith(PowerMockRunner.class)
@PrepareForTest({ ResultPartitionWriter.class })
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class StreamIterationHeadTest {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
index 615fabb..8282672 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
@@ -35,7 +35,7 @@ import java.util.List;
@RunWith(PowerMockRunner.class)
@PrepareForTest(ResultPartitionWriter.class)
-@PowerMockIgnore("javax.management.*")
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Integer>, GenericWriteAheadSinkTest.ListSink> {
@Override
protected ListSink createSink() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index a3b8b90..153b1fd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -42,6 +43,7 @@ import static org.junit.Assert.*;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(ResultPartitionWriter.class)
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
@SuppressWarnings("serial")
public class StreamTaskTimerTest {
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index cd052a3..5fcc59e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -49,6 +50,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class OneInputStreamTaskTest {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index cb779b0..1e62e28 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -34,6 +34,7 @@ import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -53,6 +54,7 @@ import java.util.concurrent.atomic.AtomicLong;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class SourceStreamTaskTest {
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
index ed1dd60..0e1da88 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -46,6 +47,7 @@ import static org.junit.Assert.assertTrue;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(ResultPartitionWriter.class)
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
@SuppressWarnings("serial")
public class StreamTaskAsyncCheckpointTest {
http://git-wip-us.apache.org/repos/asf/flink/blob/62cb954d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 7936780..b9211b1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -50,6 +51,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class TwoInputStreamTaskTest {
/**