You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/02 11:30:46 UTC

[09/50] [abbrv] ignite git commit: ignite-4492 Add MBean for StripedExecutor This closes #1491.

ignite-4492 Add MBean for StripedExecutor
This closes #1491.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8e12513e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8e12513e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8e12513e

Branch: refs/heads/ignite-4565-ddl
Commit: 8e12513efb24cc6df1da0968560ac932544ee68d
Parents: c52cb9f
Author: voipp <al...@gmail.com>
Authored: Tue Feb 14 15:08:59 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Feb 14 15:08:59 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |  48 +++++-
 .../internal/StripedExecutorMXBeanAdapter.java  |  90 ++++++++++
 .../ignite/internal/util/StripedExecutor.java   |  55 +++++-
 .../ignite/mxbean/StripedExecutorMXBean.java    |  90 ++++++++++
 .../internal/util/StripedExecutorTest.java      | 168 +++++++++++++++++++
 .../testsuites/IgniteComputeGridTestSuite.java  |   2 +
 6 files changed, 447 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8e12513e/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index a3d8c7b..cdbe2e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -168,6 +168,7 @@ import org.apache.ignite.marshaller.MarshallerExclusions;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
 import org.apache.ignite.mxbean.ClusterLocalNodeMetricsMXBean;
 import org.apache.ignite.mxbean.IgniteMXBean;
+import org.apache.ignite.mxbean.StripedExecutorMXBean;
 import org.apache.ignite.mxbean.ThreadPoolMXBean;
 import org.apache.ignite.plugin.IgnitePlugin;
 import org.apache.ignite.plugin.PluginNotFoundException;
@@ -296,6 +297,10 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     @GridToStringExclude
     private ObjectName restExecSvcMBean;
 
+    /** */
+    @GridToStringExclude
+    private ObjectName stripedExecSvcMBean;
+
     /** Kernal start timestamp. */
     private long startTime = U.currentTimeMillis();
 
@@ -963,6 +968,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
             registerKernalMBean();
             registerLocalNodeMBean();
             registerExecutorMBeans(execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, restExecSvc);
+            registerStripedExecutorMBean(stripedExecSvc);
 
             // Lifecycle bean notifications.
             notifyLifecycleBeans(AFTER_NODE_START);
@@ -1541,7 +1547,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         }
     }
 
-    /** @throws IgniteCheckedException If registration failed. */
+    /**
+     * @param execSvc
+     * @param sysExecSvc
+     * @param p2pExecSvc
+     * @param mgmtExecSvc
+     * @param restExecSvc
+     * @throws IgniteCheckedException If failed.
+     */
     private void registerExecutorMBeans(ExecutorService execSvc,
         ExecutorService sysExecSvc,
         ExecutorService p2pExecSvc,
@@ -1582,8 +1595,34 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
             return res;
         }
         catch (JMException e) {
-            throw new IgniteCheckedException("Failed to register executor service MBean [name=" + name + ", exec=" + exec + ']',
-                e);
+            throw new IgniteCheckedException("Failed to register executor service MBean [name=" + name +
+                ", exec=" + exec + ']', e);
+        }
+    }
+
+    /**
+     * @param stripedExecSvc Executor service.
+     * @throws IgniteCheckedException If registration failed.
+     */
+    private void registerStripedExecutorMBean(StripedExecutor stripedExecSvc) throws IgniteCheckedException {
+        if (stripedExecSvc != null) {
+            String name = "StripedExecutor";
+
+            try {
+                stripedExecSvcMBean = U.registerMBean(
+                    cfg.getMBeanServer(),
+                    cfg.getGridName(),
+                    "Thread Pools",
+                    name,
+                    new StripedExecutorMXBeanAdapter(stripedExecSvc),
+                    StripedExecutorMXBean.class);
+
+                if (log.isDebugEnabled())
+                    log.debug("Registered executor service MBean: " + stripedExecSvcMBean);
+            } catch (JMException e) {
+                throw new IgniteCheckedException("Failed to register executor service MBean [name="
+                    + name + ", exec=" + stripedExecSvc + ']', e);
+            }
         }
     }
 
@@ -2046,7 +2085,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                     unregisterMBean(p2PExecSvcMBean) &
                     unregisterMBean(kernalMBean) &
                     unregisterMBean(locNodeMBean) &
-                    unregisterMBean(restExecSvcMBean)
+                    unregisterMBean(restExecSvcMBean) &
+                    unregisterMBean(stripedExecSvcMBean)
             ))
                 errOnStop = false;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e12513e/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java
new file mode 100644
index 0000000..e6811b7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.concurrent.ExecutorService;
+import org.apache.ignite.internal.util.StripedExecutor;
+import org.apache.ignite.mxbean.StripedExecutorMXBean;
+
+/**
+ * Adapter for {@link StripedExecutorMXBean} which delegates all method calls to the underlying
+ * {@link ExecutorService} instance.
+ */
+public class StripedExecutorMXBeanAdapter implements StripedExecutorMXBean {
+    /** */
+    private final StripedExecutor exec;
+
+    /**
+     * @param exec Executor service
+     */
+    StripedExecutorMXBeanAdapter(StripedExecutor exec) {
+        assert exec != null;
+
+        this.exec = exec;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void checkStarvation() {
+        exec.checkStarvation();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getStripesCount() {
+        return exec.stripes();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isShutdown() {
+        return exec.isShutdown();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isTerminated() {
+        return exec.isTerminated();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getTotalQueueSize() {
+        return exec.queueSize();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getTotalCompletedTasksCount() {
+        return exec.completedTasks();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long[] getStripesCompletedTasksCounts() {
+        return exec.stripesCompletedTasks();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getActiveCount() {
+        return exec.activeStripesCount();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean[] getStripesActiveStatuses() {
+        return exec.stripesActiveStatuses();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int[] getStripesQueueSizes() {
+        return exec.stripesQueueSizes();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e12513e/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 201cb34..e70f0ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -55,9 +55,10 @@ public class StripedExecutor implements ExecutorService {
     private final IgniteLogger log;
 
     /**
-     * Constructor.
-     *
      * @param cnt Count.
+     * @param gridName Node name.
+     * @param poolName Pool name.
+     * @param log Logger.
      */
     public StripedExecutor(int cnt, String gridName, String poolName, final IgniteLogger log) {
         A.ensure(cnt > 0, "cnt > 0");
@@ -268,6 +269,56 @@ public class StripedExecutor implements ExecutorService {
     }
 
     /**
+     * @return Completed tasks per stripe count.
+     */
+    public long[] stripesCompletedTasks() {
+        long[] res = new long[stripes()];
+
+        for (int i = 0; i < res.length; i++)
+            res[i] = stripes[i].completedCnt;
+
+        return res;
+    }
+
+    /**
+     * @return Number of active tasks per stripe.
+     */
+    public boolean[] stripesActiveStatuses() {
+        boolean[] res = new boolean[stripes()];
+
+        for (int i = 0; i < res.length; i++)
+            res[i] = stripes[i].active;
+
+        return res;
+    }
+
+    /**
+     * @return Number of active tasks.
+     */
+    public int activeStripesCount() {
+        int res = 0;
+
+        for (boolean status : stripesActiveStatuses()) {
+            if (status)
+                res++;
+        }
+
+        return res;
+    }
+
+    /**
+     * @return Size of queue per stripe.
+     */
+    public int[] stripesQueueSizes() {
+        int[] res = new int[stripes()];
+
+        for (int i = 0; i < res.length; i++)
+            res[i] = stripes[i].queueSize();
+
+        return res;
+    }
+
+    /**
      * Operation not supported.
      */
     @NotNull @Override public <T> Future<T> submit(

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e12513e/modules/core/src/main/java/org/apache/ignite/mxbean/StripedExecutorMXBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/StripedExecutorMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/StripedExecutorMXBean.java
new file mode 100644
index 0000000..7428b19
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/StripedExecutorMXBean.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mxbean;
+
+/**
+ * MBean that provides access to information about striped executor service.
+ */
+@MXBeanDescription("MBean that provides access to information about striped executor service.")
+public interface StripedExecutorMXBean {
+    /**
+     * Checks for starvation in striped pool, dumps in log information if potential starvation
+     * was found.
+     */
+    @MXBeanDescription("Checks for starvation in striped pool.")
+    public void checkStarvation();
+
+    /**
+     * @return Stripes count.
+     */
+    @MXBeanDescription("Stripes count.")
+    public int getStripesCount();
+
+    /**
+     *
+     * @return {@code True} if this executor has been shut down.
+     */
+    @MXBeanDescription("True if this executor has been shut down.")
+    public boolean isShutdown();
+
+    /**
+     * Note that
+     * {@code isTerminated()} is never {@code true} unless either {@code shutdown()} or
+     * {@code shutdownNow()} was called first.
+     *
+     * @return {@code True} if all tasks have completed following shut down.
+     */
+    @MXBeanDescription("True if all tasks have completed following shut down.")
+    public boolean isTerminated();
+
+    /**
+     * @return Return total queue size of all stripes.
+     */
+    @MXBeanDescription("Total queue size of all stripes.")
+    public int getTotalQueueSize();
+
+    /**
+     * @return Completed tasks count.
+     */
+    @MXBeanDescription("Completed tasks count of all stripes.")
+    public long getTotalCompletedTasksCount();
+
+    /**
+     * @return Number of completed tasks per stripe.
+     */
+    @MXBeanDescription("Number of completed tasks per stripe.")
+    public long[] getStripesCompletedTasksCounts();
+
+    /**
+     * @return Number of active tasks.
+     */
+    @MXBeanDescription("Number of active tasks of all stripes.")
+    public int getActiveCount();
+
+    /**
+     * @return Number of active tasks per stripe.
+     */
+    @MXBeanDescription("Number of active tasks per stripe.")
+    public boolean[] getStripesActiveStatuses();
+
+    /**
+     * @return Size of queue per stripe.
+     */
+    @MXBeanDescription("Size of queue per stripe.")
+    public int[] getStripesQueueSizes();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e12513e/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java
new file mode 100644
index 0000000..543907f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import org.apache.ignite.logger.java.JavaLogger;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class StripedExecutorTest extends GridCommonAbstractTest {
+    /** */
+    private StripedExecutor stripedExecSvc;
+
+    /** {@inheritDoc} */
+    @Override public void beforeTest() {
+        stripedExecSvc = new StripedExecutor(3, "foo name", "pool name", new JavaLogger());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void afterTest() {
+        stripedExecSvc.shutdown();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCompletedTasks() throws Exception {
+        stripedExecSvc.execute(0, new TestRunnable());
+        stripedExecSvc.execute(1, new TestRunnable());
+
+        sleepASec();
+
+        assertEquals(2, stripedExecSvc.completedTasks());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStripesCompletedTasks() throws Exception {
+        stripedExecSvc.execute(0, new TestRunnable());
+        stripedExecSvc.execute(1, new TestRunnable());
+
+        sleepASec();
+
+        long[] completedTaks = stripedExecSvc.stripesCompletedTasks();
+
+        assertEquals(1, completedTaks[0]);
+        assertEquals(1, completedTaks[1]);
+        assertEquals(0, completedTaks[2]);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStripesActiveStatuses() throws Exception {
+        stripedExecSvc.execute(0, new TestRunnable());
+        stripedExecSvc.execute(1, new TestRunnable(true));
+
+        sleepASec();
+
+        boolean[] statuses = stripedExecSvc.stripesActiveStatuses();
+
+        assertFalse(statuses[0]);
+        assertTrue(statuses[1]);
+        assertFalse(statuses[0]);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testActiveStripesCount() throws Exception {
+        stripedExecSvc.execute(0, new TestRunnable());
+        stripedExecSvc.execute(1, new TestRunnable(true));
+
+        sleepASec();
+
+        assertEquals(1, stripedExecSvc.activeStripesCount());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStripesQueueSizes() throws Exception {
+        stripedExecSvc.execute(0, new TestRunnable());
+        stripedExecSvc.execute(0, new TestRunnable(true));
+        stripedExecSvc.execute(0, new TestRunnable(true));
+        stripedExecSvc.execute(1, new TestRunnable(true));
+        stripedExecSvc.execute(1, new TestRunnable(true));
+        stripedExecSvc.execute(1, new TestRunnable(true));
+
+        sleepASec();
+
+        int[] queueSizes = stripedExecSvc.stripesQueueSizes();
+
+        assertEquals(1, queueSizes[0]);
+        assertEquals(2, queueSizes[1]);
+        assertEquals(0, queueSizes[2]);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueueSize() throws Exception {
+        stripedExecSvc.execute(1, new TestRunnable());
+        stripedExecSvc.execute(1, new TestRunnable(true));
+        stripedExecSvc.execute(1, new TestRunnable(true));
+
+        sleepASec();
+
+        assertEquals(1, stripedExecSvc.queueSize());
+    }
+
+    /**
+     *
+     */
+    private final class TestRunnable implements Runnable {
+        /** */
+        private final boolean infinitely;
+
+        /**
+         *
+         */
+        public TestRunnable() {
+            this(false);
+        }
+
+        /**
+         * @param infinitely {@code True} if should sleep infinitely.
+         */
+        public TestRunnable(boolean infinitely) {
+            this.infinitely = infinitely;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            try {
+                while (infinitely)
+                    sleepASec();
+            }
+            catch (InterruptedException e) {
+                info("Got interrupted exception while sleeping: " + e);
+            }
+        }
+    }
+
+    /**
+     * @throws InterruptedException If interrupted.
+     */
+    private void sleepASec() throws InterruptedException {
+        Thread.sleep(1000);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e12513e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index 8a501fd..9a80b10 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManagerSelfT
 import org.apache.ignite.internal.managers.checkpoint.GridCheckpointTaskSelfTest;
 import org.apache.ignite.internal.managers.communication.GridCommunicationManagerListenersSelfTest;
 import org.apache.ignite.internal.processors.compute.PublicThreadpoolStarvationTest;
+import org.apache.ignite.internal.util.StripedExecutorTest;
 import org.apache.ignite.p2p.GridMultinodeRedeployContinuousModeSelfTest;
 import org.apache.ignite.p2p.GridMultinodeRedeployIsolatedModeSelfTest;
 import org.apache.ignite.p2p.GridMultinodeRedeployPrivateModeSelfTest;
@@ -152,6 +153,7 @@ public class IgniteComputeGridTestSuite {
         suite.addTestSuite(TaskNodeRestartTest.class);
         suite.addTestSuite(IgniteRoundRobinErrorAfterClientReconnectTest.class);
         suite.addTestSuite(PublicThreadpoolStarvationTest.class);
+        suite.addTestSuite(StripedExecutorTest.class);
 
         return suite;
     }