You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/02 11:30:46 UTC
[09/50] [abbrv] ignite git commit: ignite-4492 Add MBean for
StripedExecutor This closes #1491.
ignite-4492 Add MBean for StripedExecutor
This closes #1491.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8e12513e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8e12513e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8e12513e
Branch: refs/heads/ignite-4565-ddl
Commit: 8e12513efb24cc6df1da0968560ac932544ee68d
Parents: c52cb9f
Author: voipp <al...@gmail.com>
Authored: Tue Feb 14 15:08:59 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Feb 14 15:08:59 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 48 +++++-
.../internal/StripedExecutorMXBeanAdapter.java | 90 ++++++++++
.../ignite/internal/util/StripedExecutor.java | 55 +++++-
.../ignite/mxbean/StripedExecutorMXBean.java | 90 ++++++++++
.../internal/util/StripedExecutorTest.java | 168 +++++++++++++++++++
.../testsuites/IgniteComputeGridTestSuite.java | 2 +
6 files changed, 447 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e12513e/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index a3d8c7b..cdbe2e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -168,6 +168,7 @@ import org.apache.ignite.marshaller.MarshallerExclusions;
import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
import org.apache.ignite.mxbean.ClusterLocalNodeMetricsMXBean;
import org.apache.ignite.mxbean.IgniteMXBean;
+import org.apache.ignite.mxbean.StripedExecutorMXBean;
import org.apache.ignite.mxbean.ThreadPoolMXBean;
import org.apache.ignite.plugin.IgnitePlugin;
import org.apache.ignite.plugin.PluginNotFoundException;
@@ -296,6 +297,10 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
@GridToStringExclude
private ObjectName restExecSvcMBean;
+ /** */
+ @GridToStringExclude
+ private ObjectName stripedExecSvcMBean;
+
/** Kernal start timestamp. */
private long startTime = U.currentTimeMillis();
@@ -963,6 +968,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
registerKernalMBean();
registerLocalNodeMBean();
registerExecutorMBeans(execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, restExecSvc);
+ registerStripedExecutorMBean(stripedExecSvc);
// Lifecycle bean notifications.
notifyLifecycleBeans(AFTER_NODE_START);
@@ -1541,7 +1547,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}
}
- /** @throws IgniteCheckedException If registration failed. */
+ /**
+ * @param execSvc
+ * @param sysExecSvc
+ * @param p2pExecSvc
+ * @param mgmtExecSvc
+ * @param restExecSvc
+ * @throws IgniteCheckedException If failed.
+ */
private void registerExecutorMBeans(ExecutorService execSvc,
ExecutorService sysExecSvc,
ExecutorService p2pExecSvc,
@@ -1582,8 +1595,34 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
return res;
}
catch (JMException e) {
- throw new IgniteCheckedException("Failed to register executor service MBean [name=" + name + ", exec=" + exec + ']',
- e);
+ throw new IgniteCheckedException("Failed to register executor service MBean [name=" + name +
+ ", exec=" + exec + ']', e);
+ }
+ }
+
+ /**
+ * @param stripedExecSvc Executor service.
+ * @throws IgniteCheckedException If registration failed.
+ */
+ private void registerStripedExecutorMBean(StripedExecutor stripedExecSvc) throws IgniteCheckedException {
+ if (stripedExecSvc != null) {
+ String name = "StripedExecutor";
+
+ try {
+ stripedExecSvcMBean = U.registerMBean(
+ cfg.getMBeanServer(),
+ cfg.getGridName(),
+ "Thread Pools",
+ name,
+ new StripedExecutorMXBeanAdapter(stripedExecSvc),
+ StripedExecutorMXBean.class);
+
+ if (log.isDebugEnabled())
+ log.debug("Registered executor service MBean: " + stripedExecSvcMBean);
+ } catch (JMException e) {
+ throw new IgniteCheckedException("Failed to register executor service MBean [name="
+ + name + ", exec=" + stripedExecSvc + ']', e);
+ }
}
}
@@ -2046,7 +2085,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
unregisterMBean(p2PExecSvcMBean) &
unregisterMBean(kernalMBean) &
unregisterMBean(locNodeMBean) &
- unregisterMBean(restExecSvcMBean)
+ unregisterMBean(restExecSvcMBean) &
+ unregisterMBean(stripedExecSvcMBean)
))
errOnStop = false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e12513e/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java
new file mode 100644
index 0000000..e6811b7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.concurrent.ExecutorService;
+import org.apache.ignite.internal.util.StripedExecutor;
+import org.apache.ignite.mxbean.StripedExecutorMXBean;
+
+/**
+ * Adapter for {@link StripedExecutorMXBean} which delegates all method calls to the underlying
+ * {@link ExecutorService} instance.
+ */
+public class StripedExecutorMXBeanAdapter implements StripedExecutorMXBean {
+ /** */
+ private final StripedExecutor exec;
+
+ /**
+ * @param exec Executor service
+ */
+ StripedExecutorMXBeanAdapter(StripedExecutor exec) {
+ assert exec != null;
+
+ this.exec = exec;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void checkStarvation() {
+ exec.checkStarvation();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getStripesCount() {
+ return exec.stripes();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isShutdown() {
+ return exec.isShutdown();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isTerminated() {
+ return exec.isTerminated();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getTotalQueueSize() {
+ return exec.queueSize();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getTotalCompletedTasksCount() {
+ return exec.completedTasks();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long[] getStripesCompletedTasksCounts() {
+ return exec.stripesCompletedTasks();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getActiveCount() {
+ return exec.activeStripesCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean[] getStripesActiveStatuses() {
+ return exec.stripesActiveStatuses();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int[] getStripesQueueSizes() {
+ return exec.stripesQueueSizes();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e12513e/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 201cb34..e70f0ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -55,9 +55,10 @@ public class StripedExecutor implements ExecutorService {
private final IgniteLogger log;
/**
- * Constructor.
- *
* @param cnt Count.
+ * @param gridName Node name.
+ * @param poolName Pool name.
+ * @param log Logger.
*/
public StripedExecutor(int cnt, String gridName, String poolName, final IgniteLogger log) {
A.ensure(cnt > 0, "cnt > 0");
@@ -268,6 +269,56 @@ public class StripedExecutor implements ExecutorService {
}
/**
+ * @return Completed tasks per stripe count.
+ */
+ public long[] stripesCompletedTasks() {
+ long[] res = new long[stripes()];
+
+ for (int i = 0; i < res.length; i++)
+ res[i] = stripes[i].completedCnt;
+
+ return res;
+ }
+
+ /**
+ * @return Number of active tasks per stripe.
+ */
+ public boolean[] stripesActiveStatuses() {
+ boolean[] res = new boolean[stripes()];
+
+ for (int i = 0; i < res.length; i++)
+ res[i] = stripes[i].active;
+
+ return res;
+ }
+
+ /**
+ * @return Number of active tasks.
+ */
+ public int activeStripesCount() {
+ int res = 0;
+
+ for (boolean status : stripesActiveStatuses()) {
+ if (status)
+ res++;
+ }
+
+ return res;
+ }
+
+ /**
+ * @return Size of queue per stripe.
+ */
+ public int[] stripesQueueSizes() {
+ int[] res = new int[stripes()];
+
+ for (int i = 0; i < res.length; i++)
+ res[i] = stripes[i].queueSize();
+
+ return res;
+ }
+
+ /**
* Operation not supported.
*/
@NotNull @Override public <T> Future<T> submit(
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e12513e/modules/core/src/main/java/org/apache/ignite/mxbean/StripedExecutorMXBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/StripedExecutorMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/StripedExecutorMXBean.java
new file mode 100644
index 0000000..7428b19
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/StripedExecutorMXBean.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mxbean;
+
+/**
+ * MBean that provides access to information about striped executor service.
+ */
+@MXBeanDescription("MBean that provides access to information about striped executor service.")
+public interface StripedExecutorMXBean {
+ /**
+ * Checks for starvation in striped pool, dumps in log information if potential starvation
+ * was found.
+ */
+ @MXBeanDescription("Checks for starvation in striped pool.")
+ public void checkStarvation();
+
+ /**
+ * @return Stripes count.
+ */
+ @MXBeanDescription("Stripes count.")
+ public int getStripesCount();
+
+ /**
+ *
+ * @return {@code True} if this executor has been shut down.
+ */
+ @MXBeanDescription("True if this executor has been shut down.")
+ public boolean isShutdown();
+
+ /**
+ * Note that
+ * {@code isTerminated()} is never {@code true} unless either {@code shutdown()} or
+ * {@code shutdownNow()} was called first.
+ *
+ * @return {@code True} if all tasks have completed following shut down.
+ */
+ @MXBeanDescription("True if all tasks have completed following shut down.")
+ public boolean isTerminated();
+
+ /**
+ * @return Return total queue size of all stripes.
+ */
+ @MXBeanDescription("Total queue size of all stripes.")
+ public int getTotalQueueSize();
+
+ /**
+ * @return Completed tasks count.
+ */
+ @MXBeanDescription("Completed tasks count of all stripes.")
+ public long getTotalCompletedTasksCount();
+
+ /**
+ * @return Number of completed tasks per stripe.
+ */
+ @MXBeanDescription("Number of completed tasks per stripe.")
+ public long[] getStripesCompletedTasksCounts();
+
+ /**
+ * @return Number of active tasks.
+ */
+ @MXBeanDescription("Number of active tasks of all stripes.")
+ public int getActiveCount();
+
+ /**
+ * @return Number of active tasks per stripe.
+ */
+ @MXBeanDescription("Number of active tasks per stripe.")
+ public boolean[] getStripesActiveStatuses();
+
+ /**
+ * @return Size of queue per stripe.
+ */
+ @MXBeanDescription("Size of queue per stripe.")
+ public int[] getStripesQueueSizes();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e12513e/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java
new file mode 100644
index 0000000..543907f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import org.apache.ignite.logger.java.JavaLogger;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class StripedExecutorTest extends GridCommonAbstractTest {
+ /** */
+ private StripedExecutor stripedExecSvc;
+
+ /** {@inheritDoc} */
+ @Override public void beforeTest() {
+ stripedExecSvc = new StripedExecutor(3, "foo name", "pool name", new JavaLogger());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void afterTest() {
+ stripedExecSvc.shutdown();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCompletedTasks() throws Exception {
+ stripedExecSvc.execute(0, new TestRunnable());
+ stripedExecSvc.execute(1, new TestRunnable());
+
+ sleepASec();
+
+ assertEquals(2, stripedExecSvc.completedTasks());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStripesCompletedTasks() throws Exception {
+ stripedExecSvc.execute(0, new TestRunnable());
+ stripedExecSvc.execute(1, new TestRunnable());
+
+ sleepASec();
+
+ long[] completedTaks = stripedExecSvc.stripesCompletedTasks();
+
+ assertEquals(1, completedTaks[0]);
+ assertEquals(1, completedTaks[1]);
+ assertEquals(0, completedTaks[2]);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStripesActiveStatuses() throws Exception {
+ stripedExecSvc.execute(0, new TestRunnable());
+ stripedExecSvc.execute(1, new TestRunnable(true));
+
+ sleepASec();
+
+ boolean[] statuses = stripedExecSvc.stripesActiveStatuses();
+
+ assertFalse(statuses[0]);
+ assertTrue(statuses[1]);
+ assertFalse(statuses[0]);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testActiveStripesCount() throws Exception {
+ stripedExecSvc.execute(0, new TestRunnable());
+ stripedExecSvc.execute(1, new TestRunnable(true));
+
+ sleepASec();
+
+ assertEquals(1, stripedExecSvc.activeStripesCount());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStripesQueueSizes() throws Exception {
+ stripedExecSvc.execute(0, new TestRunnable());
+ stripedExecSvc.execute(0, new TestRunnable(true));
+ stripedExecSvc.execute(0, new TestRunnable(true));
+ stripedExecSvc.execute(1, new TestRunnable(true));
+ stripedExecSvc.execute(1, new TestRunnable(true));
+ stripedExecSvc.execute(1, new TestRunnable(true));
+
+ sleepASec();
+
+ int[] queueSizes = stripedExecSvc.stripesQueueSizes();
+
+ assertEquals(1, queueSizes[0]);
+ assertEquals(2, queueSizes[1]);
+ assertEquals(0, queueSizes[2]);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueueSize() throws Exception {
+ stripedExecSvc.execute(1, new TestRunnable());
+ stripedExecSvc.execute(1, new TestRunnable(true));
+ stripedExecSvc.execute(1, new TestRunnable(true));
+
+ sleepASec();
+
+ assertEquals(1, stripedExecSvc.queueSize());
+ }
+
+ /**
+ *
+ */
+ private final class TestRunnable implements Runnable {
+ /** */
+ private final boolean infinitely;
+
+ /**
+ *
+ */
+ public TestRunnable() {
+ this(false);
+ }
+
+ /**
+ * @param infinitely {@code True} if should sleep infinitely.
+ */
+ public TestRunnable(boolean infinitely) {
+ this.infinitely = infinitely;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try {
+ while (infinitely)
+ sleepASec();
+ }
+ catch (InterruptedException e) {
+ info("Got interrupted exception while sleeping: " + e);
+ }
+ }
+ }
+
+ /**
+ * @throws InterruptedException If interrupted.
+ */
+ private void sleepASec() throws InterruptedException {
+ Thread.sleep(1000);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e12513e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index 8a501fd..9a80b10 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManagerSelfT
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointTaskSelfTest;
import org.apache.ignite.internal.managers.communication.GridCommunicationManagerListenersSelfTest;
import org.apache.ignite.internal.processors.compute.PublicThreadpoolStarvationTest;
+import org.apache.ignite.internal.util.StripedExecutorTest;
import org.apache.ignite.p2p.GridMultinodeRedeployContinuousModeSelfTest;
import org.apache.ignite.p2p.GridMultinodeRedeployIsolatedModeSelfTest;
import org.apache.ignite.p2p.GridMultinodeRedeployPrivateModeSelfTest;
@@ -152,6 +153,7 @@ public class IgniteComputeGridTestSuite {
suite.addTestSuite(TaskNodeRestartTest.class);
suite.addTestSuite(IgniteRoundRobinErrorAfterClientReconnectTest.class);
suite.addTestSuite(PublicThreadpoolStarvationTest.class);
+ suite.addTestSuite(StripedExecutorTest.class);
return suite;
}