You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/05/15 15:03:37 UTC

[1/2] ignite git commit: Added IO latency test + made it available from MBean

Repository: ignite
Updated Branches:
  refs/heads/master feda3ff52 -> 5b16b51c8


Added IO latency test + made it available from MBean


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8195ae06
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8195ae06
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8195ae06

Branch: refs/heads/master
Commit: 8195ae0655255979d02d425a6ca11fb720d460d3
Parents: ed4a7a1
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Mon May 15 18:03:07 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Mon May 15 18:03:07 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |  14 ++
 .../managers/communication/GridIoManager.java   | 206 ++++++++++++++++++-
 .../org/apache/ignite/mxbean/IgniteMXBean.java  |  44 ++++
 3 files changed, 263 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8195ae06/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 8ba6a88..2c8c964 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -3960,6 +3960,20 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     }
 
     /** {@inheritDoc} */
+    @Override public void runIoTest(
+        long warmup,
+        long duration,
+        int threads,
+        long maxLatency,
+        int rangesCnt,
+        int payLoadSize,
+        boolean procFromNioThread
+    ) {
+        ctx.io().runIoTest(warmup, duration, threads, maxLatency, rangesCnt, payLoadSize, procFromNioThread,
+            new ArrayList(ctx.cluster().get().forServers().forRemotes().nodes()));
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgniteKernal.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8195ae06/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 9d35c75..fc94667 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -21,17 +21,23 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -86,6 +92,7 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
+import org.jsr166.LongAdder8;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
@@ -99,11 +106,11 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGF
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.P2P_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SCHEMA_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SERVICE_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.isReservedGridIoPolicy;
 import static org.apache.ignite.internal.util.nio.GridNioBackPressureControl.threadProcessingMessage;
 import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV;
@@ -433,6 +440,203 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         return map;
     }
 
+    /**
+     * @param warmup Warmup duration in milliseconds.
+     * @param duration Test duration in milliseconds.
+     * @param threads Thread count.
+     * @param maxLatency Max latency in nanoseconds.
+     * @param rangesCnt Ranges count in resulting histogram.
+     * @param payLoadSize Payload size in bytes.
+     * @param procFromNioThread {@code True} to process requests in NIO threads.
+     * @param nodes Nodes participating in test.
+     */
+    public void runIoTest(
+        final long warmup,
+        final long duration,
+        final int threads,
+        final long maxLatency,
+        final int rangesCnt,
+        final int payLoadSize,
+        final boolean procFromNioThread,
+        final List<ClusterNode> nodes
+    ) {
+        ExecutorService svc = Executors.newFixedThreadPool(threads + 1);
+
+        final AtomicBoolean warmupFinished = new AtomicBoolean();
+        final AtomicBoolean done = new AtomicBoolean();
+        final CyclicBarrier bar = new CyclicBarrier(threads + 1);
+        final LongAdder8 cnt = new LongAdder8();
+        final long sleepDuration = 5000;
+        final byte[] payLoad = new byte[payLoadSize];
+        final Map<UUID, long[]>[] res = new Map[threads];
+
+        boolean failed = true;
+
+        try {
+            svc.execute(new Runnable() {
+                @Override public void run() {
+                    boolean failed = true;
+
+                    try {
+                        bar.await();
+
+                        long start = System.currentTimeMillis();
+
+                        if (log.isInfoEnabled())
+                            log.info("IO test started " +
+                                "[warmup=" + warmup +
+                                ", duration=" + duration +
+                                ", threads=" + threads +
+                                ", maxLatency=" + maxLatency +
+                                ", rangesCnt=" + rangesCnt +
+                                ", payLoadSize=" + payLoadSize +
+                                ", procFromNioThreads=" + procFromNioThread + ']'
+                            );
+
+                        for (;;) {
+                            if (!warmupFinished.get() && System.currentTimeMillis() - start > warmup) {
+                                if (log.isInfoEnabled())
+                                    log.info("IO test warmup finished.");
+
+                                warmupFinished.set(true);
+
+                                start = System.currentTimeMillis();
+                            }
+
+                            if (warmupFinished.get() && System.currentTimeMillis() - start > duration) {
+                                if (log.isInfoEnabled())
+                                    log.info("IO test finished, will wait for all threads to finish.");
+
+                                done.set(true);
+
+                                bar.await();
+
+                                failed = false;
+
+                                break;
+                            }
+
+                            if (log.isInfoEnabled())
+                                log.info("IO test [opsCnt/sec=" + (cnt.sumThenReset() * 1000 / sleepDuration) +
+                                    ", warmup=" + !warmupFinished.get() +
+                                    ", elapsed=" + (System.currentTimeMillis() - start) + ']');
+
+                            Thread.sleep(sleepDuration);
+                        }
+
+                        // At this point all threads have finished the test and stored data to the result map.
+                        Map<UUID, long[]> res0 = new HashMap<>();
+
+                        for (Map<UUID, long[]> r : res) {
+                            for (Entry<UUID, long[]> e : r.entrySet()) {
+                                long[] r0 = res0.get(e.getKey());
+
+                                if (r0 == null)
+                                    res0.put(e.getKey(), e.getValue());
+                                else {
+                                    for (int i = 0; i < rangesCnt + 1; i++)
+                                        r0[i] += e.getValue()[i];
+                                }
+                            }
+                        }
+
+                        StringBuilder b = new StringBuilder("IO test results " +
+                            "[range=" + (maxLatency / (1000 * rangesCnt)) + "mcs]");
+
+                        b.append(U.nl());
+
+                        for (Entry<UUID, long[]> e : res0.entrySet()) {
+                            ClusterNode node = ctx.discovery().node(e.getKey());
+
+                            b.append("    ").append(e.getKey()).append(" (addrs=")
+                                .append(node != null ? node.addresses().toString() : "n/a").append(')')
+                                .append(Arrays.toString(e.getValue())).append(U.nl());
+                        }
+
+                        if (log.isInfoEnabled())
+                            log.info(b.toString());
+                    }
+                    catch (InterruptedException | BrokenBarrierException e) {
+                        U.error(log, "IO test failed.", e);
+                    }
+                    finally {
+                        if (failed)
+                            bar.reset();
+                    }
+                }
+            });
+
+            for (int i = 0; i < threads; i++) {
+                final int i0 = i;
+
+                res[i] = U.newHashMap(nodes.size());
+
+                svc.execute(new Runnable() {
+                    @Override public void run() {
+                        boolean failed = true;
+                        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+                        int size = nodes.size();
+                        Map<UUID, long[]> res0 = res[i0];
+
+                        try {
+                            boolean warmupFinished0 = false;
+
+                            bar.await();
+
+                            for (;;) {
+                                if (done.get())
+                                    break;
+
+                                if (!warmupFinished0)
+                                    warmupFinished0 = warmupFinished.get();
+
+                                ClusterNode node = nodes.get(rnd.nextInt(size));
+
+                                long start = System.nanoTime();
+
+                                sendIoTest(node, payLoad, procFromNioThread).get();
+
+                                long latency = System.nanoTime() - start;
+
+                                cnt.increment();
+
+                                long[] latencies = res0.get(node.id());
+
+                                if (latencies == null)
+                                    res0.put(node.id(), latencies = new long[rangesCnt + 1]);
+
+                                if (latency >= maxLatency)
+                                    latencies[rangesCnt]++; // Timed out.
+                                else {
+                                    int idx = (int)Math.floor((1.0 * latency) / ((1.0 * maxLatency) / rangesCnt));
+
+                                    latencies[idx]++;
+                                }
+                            }
+
+                            bar.await();
+
+                            failed = false;
+                        }
+                        catch (Exception e) {
+                            U.error(log, "IO test worker thread failed.", e);
+                        }
+                        finally {
+                            if (failed)
+                                bar.reset();
+                        }
+                    }
+                });
+            }
+
+            failed = false;
+        }
+        finally {
+            if (failed)
+                U.shutdownNow(GridIoManager.class, svc, log);
+        }
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings({"deprecation", "SynchronizationOnLocalVariableOrMethodParameter"})
     @Override public void onKernalStart0() throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8195ae06/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
index bf84b0d..ce63e4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
@@ -385,4 +385,48 @@ public interface IgniteMXBean {
      */
     @MXBeanDescription("Dumps debug information for the current node.")
     public void dumpDebugInfo();
+
+    /**
+     * Runs IO latency test against all remote server nodes in cluster.
+     *
+     * @param warmup Warmup duration in milliseconds.
+     * @param duration Test duration in milliseconds.
+     * @param threads Thread count.
+     * @param maxLatency Max latency in nanoseconds.
+     * @param rangesCnt Ranges count in resulting histogram.
+     * @param payLoadSize Payload size in bytes.
+     * @param procFromNioThread {@code True} to process requests in NIO threads.
+     */
+    @MXBeanDescription("Runs IO latency test against all remote server nodes in cluster.")
+    @MXBeanParametersNames(
+        {
+            "warmup",
+            "duration",
+            "threads",
+            "maxLatency",
+            "rangesCnt",
+            "payLoadSize",
+            "procFromNioThread"
+        }
+    )
+    @MXBeanParametersDescriptions(
+        {
+            "Warmup duration (millis).",
+            "Test duration (millis).",
+            "Threads count.",
+            "Maximum latency expected (nanos).",
+            "Ranges count for histogram.",
+            "Payload size (bytes).",
+            "Process requests in NIO-threads flag."
+        }
+    )
+    void runIoTest(
+        long warmup,
+        long duration,
+        int threads,
+        long maxLatency,
+        int rangesCnt,
+        int payLoadSize,
+        boolean procFromNioThread
+    );
 }


[2/2] ignite git commit: Merge remote-tracking branch 'origin/master'

Posted by yz...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5b16b51c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5b16b51c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5b16b51c

Branch: refs/heads/master
Commit: 5b16b51c8779ed3d129ac45621e73dd903fe65dd
Parents: 8195ae0 feda3ff
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Mon May 15 18:03:30 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Mon May 15 18:03:30 2017 +0300

----------------------------------------------------------------------
 .../DistributedRegressionExample.java           | 149 ++++++++++++
 .../processors/cache/GridCacheAdapter.java      |  71 +-----
 .../cache/GridCacheConcurrentMap.java           |   7 +-
 .../cache/GridCacheConcurrentMapImpl.java       |  30 +--
 .../processors/cache/GridCacheProxyImpl.java    |  60 -----
 .../processors/cache/GridNoStorageCacheMap.java |   7 +-
 .../processors/cache/IgniteInternalCache.java   |  61 -----
 .../dht/GridCachePartitionedConcurrentMap.java  |  13 +-
 .../distributed/dht/GridDhtCacheAdapter.java    | 162 -------------
 .../distributed/dht/GridDhtLocalPartition.java  |   4 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   2 +-
 .../distributed/near/GridNearCacheAdapter.java  |  17 --
 .../processors/cache/GridCacheLeakTest.java     |   4 +-
 .../GridCachePartitionedFullApiSelfTest.java    |  18 --
 .../cache/eviction/EvictionAbstractTest.java    |  13 +-
 .../IgniteCacheClientNearCacheExpiryTest.java   |   2 +-
 .../hadoop/jobtracker/HadoopJobTracker.java     |  20 +-
 .../cache/hibernate/HibernateCacheProxy.java    |  25 --
 .../apache/ignite/ml/math/util/MatrixUtil.java  |   3 +-
 .../org/apache/ignite/ml/IgniteMLTestSuite.java |  35 +++
 .../ml/math/MathImplDistributedTestSuite.java   |   2 +-
 .../ignite/ml/math/MathImplLocalTestSuite.java  |   7 +-
 .../ignite/ml/math/MathImplMainTestSuite.java   |   2 +-
 .../OLSMultipleLinearRegressionTest.java        |   7 +
 .../ml/regressions/RegressionsTestSuite.java    |  32 +++
 .../Binary/BinarySelfTest.cs                    |  26 ++
 .../Impl/Binary/BinaryReflectiveActions.cs      |   6 +-
 .../BinaryReflectiveSerializerInternal.cs       |   9 +-
 .../Impl/Binary/BinaryUtils.cs                  |   9 -
 .../Binary/DeserializationCallbackProcessor.cs  |  11 +
 .../Impl/Binary/SerializableSerializer.cs       |  10 +-
 .../activities-user-dialog.tpl.pug              |  33 +--
 .../components/web-console-header/style.scss    |   9 -
 .../frontend/app/primitives/btn/index.scss      | 235 +++++++++++++++++++
 .../frontend/app/primitives/index.js            |   2 +
 .../frontend/app/primitives/modal/index.scss    | 179 ++++++++++++++
 .../frontend/app/primitives/table/index.scss    |  91 +++++++
 .../frontend/public/stylesheets/style.scss      |  99 ++------
 .../frontend/public/stylesheets/variables.scss  |   4 +
 .../frontend/views/includes/header-right.pug    |   4 +-
 .../web-console/frontend/views/signin.tpl.pug   |   2 +-
 41 files changed, 864 insertions(+), 618 deletions(-)
----------------------------------------------------------------------