You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/05/15 15:03:37 UTC
[1/2] ignite git commit: Added IO latency test + made it available
from MBean
Repository: ignite
Updated Branches:
refs/heads/master feda3ff52 -> 5b16b51c8
Added IO latency test + made it available from MBean
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8195ae06
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8195ae06
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8195ae06
Branch: refs/heads/master
Commit: 8195ae0655255979d02d425a6ca11fb720d460d3
Parents: ed4a7a1
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Mon May 15 18:03:07 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Mon May 15 18:03:07 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 14 ++
.../managers/communication/GridIoManager.java | 206 ++++++++++++++++++-
.../org/apache/ignite/mxbean/IgniteMXBean.java | 44 ++++
3 files changed, 263 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8195ae06/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 8ba6a88..2c8c964 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -3960,6 +3960,20 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}
/** {@inheritDoc} */
+ @Override public void runIoTest(
+ long warmup,
+ long duration,
+ int threads,
+ long maxLatency,
+ int rangesCnt,
+ int payLoadSize,
+ boolean procFromNioThread
+ ) {
+ ctx.io().runIoTest(warmup, duration, threads, maxLatency, rangesCnt, payLoadSize, procFromNioThread,
+ new ArrayList(ctx.cluster().get().forServers().forRemotes().nodes()));
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgniteKernal.class, this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8195ae06/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 9d35c75..fc94667 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -21,17 +21,23 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.UUID;
+import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -86,6 +92,7 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import org.jsr166.ConcurrentLinkedDeque8;
+import org.jsr166.LongAdder8;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
@@ -99,11 +106,11 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGF
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.P2P_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SCHEMA_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SERVICE_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.isReservedGridIoPolicy;
import static org.apache.ignite.internal.util.nio.GridNioBackPressureControl.threadProcessingMessage;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV;
@@ -433,6 +440,203 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return map;
}
+ /**
+ * @param warmup Warmup duration in milliseconds.
+ * @param duration Test duration in milliseconds.
+ * @param threads Thread count.
+ * @param maxLatency Max latency in nanoseconds.
+ * @param rangesCnt Ranges count in resulting histogram.
+ * @param payLoadSize Payload size in bytes.
+ * @param procFromNioThread {@code True} to process requests in NIO threads.
+ * @param nodes Nodes participating in test.
+ */
+ public void runIoTest(
+ final long warmup,
+ final long duration,
+ final int threads,
+ final long maxLatency,
+ final int rangesCnt,
+ final int payLoadSize,
+ final boolean procFromNioThread,
+ final List<ClusterNode> nodes
+ ) {
+ ExecutorService svc = Executors.newFixedThreadPool(threads + 1);
+
+ final AtomicBoolean warmupFinished = new AtomicBoolean();
+ final AtomicBoolean done = new AtomicBoolean();
+ final CyclicBarrier bar = new CyclicBarrier(threads + 1);
+ final LongAdder8 cnt = new LongAdder8();
+ final long sleepDuration = 5000;
+ final byte[] payLoad = new byte[payLoadSize];
+ final Map<UUID, long[]>[] res = new Map[threads];
+
+ boolean failed = true;
+
+ try {
+ svc.execute(new Runnable() {
+ @Override public void run() {
+ boolean failed = true;
+
+ try {
+ bar.await();
+
+ long start = System.currentTimeMillis();
+
+ if (log.isInfoEnabled())
+ log.info("IO test started " +
+ "[warmup=" + warmup +
+ ", duration=" + duration +
+ ", threads=" + threads +
+ ", maxLatency=" + maxLatency +
+ ", rangesCnt=" + rangesCnt +
+ ", payLoadSize=" + payLoadSize +
+ ", procFromNioThreads=" + procFromNioThread + ']'
+ );
+
+ for (;;) {
+ if (!warmupFinished.get() && System.currentTimeMillis() - start > warmup) {
+ if (log.isInfoEnabled())
+ log.info("IO test warmup finished.");
+
+ warmupFinished.set(true);
+
+ start = System.currentTimeMillis();
+ }
+
+ if (warmupFinished.get() && System.currentTimeMillis() - start > duration) {
+ if (log.isInfoEnabled())
+ log.info("IO test finished, will wait for all threads to finish.");
+
+ done.set(true);
+
+ bar.await();
+
+ failed = false;
+
+ break;
+ }
+
+ if (log.isInfoEnabled())
+ log.info("IO test [opsCnt/sec=" + (cnt.sumThenReset() * 1000 / sleepDuration) +
+ ", warmup=" + !warmupFinished.get() +
+ ", elapsed=" + (System.currentTimeMillis() - start) + ']');
+
+ Thread.sleep(sleepDuration);
+ }
+
+ // At this point all threads have finished the test and stored data to the result map.
+ Map<UUID, long[]> res0 = new HashMap<>();
+
+ for (Map<UUID, long[]> r : res) {
+ for (Entry<UUID, long[]> e : r.entrySet()) {
+ long[] r0 = res0.get(e.getKey());
+
+ if (r0 == null)
+ res0.put(e.getKey(), e.getValue());
+ else {
+ for (int i = 0; i < rangesCnt + 1; i++)
+ r0[i] += e.getValue()[i];
+ }
+ }
+ }
+
+ StringBuilder b = new StringBuilder("IO test results " +
+ "[range=" + (maxLatency / (1000 * rangesCnt)) + "mcs]");
+
+ b.append(U.nl());
+
+ for (Entry<UUID, long[]> e : res0.entrySet()) {
+ ClusterNode node = ctx.discovery().node(e.getKey());
+
+ b.append(" ").append(e.getKey()).append(" (addrs=")
+ .append(node != null ? node.addresses().toString() : "n/a").append(')')
+ .append(Arrays.toString(e.getValue())).append(U.nl());
+ }
+
+ if (log.isInfoEnabled())
+ log.info(b.toString());
+ }
+ catch (InterruptedException | BrokenBarrierException e) {
+ U.error(log, "IO test failed.", e);
+ }
+ finally {
+ if (failed)
+ bar.reset();
+ }
+ }
+ });
+
+ for (int i = 0; i < threads; i++) {
+ final int i0 = i;
+
+ res[i] = U.newHashMap(nodes.size());
+
+ svc.execute(new Runnable() {
+ @Override public void run() {
+ boolean failed = true;
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ int size = nodes.size();
+ Map<UUID, long[]> res0 = res[i0];
+
+ try {
+ boolean warmupFinished0 = false;
+
+ bar.await();
+
+ for (;;) {
+ if (done.get())
+ break;
+
+ if (!warmupFinished0)
+ warmupFinished0 = warmupFinished.get();
+
+ ClusterNode node = nodes.get(rnd.nextInt(size));
+
+ long start = System.nanoTime();
+
+ sendIoTest(node, payLoad, procFromNioThread).get();
+
+ long latency = System.nanoTime() - start;
+
+ cnt.increment();
+
+ long[] latencies = res0.get(node.id());
+
+ if (latencies == null)
+ res0.put(node.id(), latencies = new long[rangesCnt + 1]);
+
+ if (latency >= maxLatency)
+ latencies[rangesCnt]++; // Timed out.
+ else {
+ int idx = (int)Math.floor((1.0 * latency) / ((1.0 * maxLatency) / rangesCnt));
+
+ latencies[idx]++;
+ }
+ }
+
+ bar.await();
+
+ failed = false;
+ }
+ catch (Exception e) {
+ U.error(log, "IO test worker thread failed.", e);
+ }
+ finally {
+ if (failed)
+ bar.reset();
+ }
+ }
+ });
+ }
+
+ failed = false;
+ }
+ finally {
+ if (failed)
+ U.shutdownNow(GridIoManager.class, svc, log);
+ }
+ }
+
/** {@inheritDoc} */
@SuppressWarnings({"deprecation", "SynchronizationOnLocalVariableOrMethodParameter"})
@Override public void onKernalStart0() throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/ignite/blob/8195ae06/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
index bf84b0d..ce63e4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
@@ -385,4 +385,48 @@ public interface IgniteMXBean {
*/
@MXBeanDescription("Dumps debug information for the current node.")
public void dumpDebugInfo();
+
+ /**
+ * Runs IO latency test against all remote server nodes in cluster.
+ *
+ * @param warmup Warmup duration in milliseconds.
+ * @param duration Test duration in milliseconds.
+ * @param threads Thread count.
+ * @param maxLatency Max latency in nanoseconds.
+ * @param rangesCnt Ranges count in resulting histogram.
+ * @param payLoadSize Payload size in bytes.
+ * @param procFromNioThread {@code True} to process requests in NIO threads.
+ */
+ @MXBeanDescription("Runs IO latency test against all remote server nodes in cluster.")
+ @MXBeanParametersNames(
+ {
+ "warmup",
+ "duration",
+ "threads",
+ "maxLatency",
+ "rangesCnt",
+ "payLoadSize",
+ "procFromNioThread"
+ }
+ )
+ @MXBeanParametersDescriptions(
+ {
+ "Warmup duration (millis).",
+ "Test duration (millis).",
+ "Threads count.",
+ "Maximum latency expected (nanos).",
+ "Ranges count for histogram.",
+ "Payload size (bytes).",
+ "Process requests in NIO-threads flag."
+ }
+ )
+ void runIoTest(
+ long warmup,
+ long duration,
+ int threads,
+ long maxLatency,
+ int rangesCnt,
+ int payLoadSize,
+ boolean procFromNioThread
+ );
}
[2/2] ignite git commit: Merge remote-tracking branch 'origin/master'
Posted by yz...@apache.org.
Merge remote-tracking branch 'origin/master'
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5b16b51c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5b16b51c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5b16b51c
Branch: refs/heads/master
Commit: 5b16b51c8779ed3d129ac45621e73dd903fe65dd
Parents: 8195ae0 feda3ff
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Mon May 15 18:03:30 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Mon May 15 18:03:30 2017 +0300
----------------------------------------------------------------------
.../DistributedRegressionExample.java | 149 ++++++++++++
.../processors/cache/GridCacheAdapter.java | 71 +-----
.../cache/GridCacheConcurrentMap.java | 7 +-
.../cache/GridCacheConcurrentMapImpl.java | 30 +--
.../processors/cache/GridCacheProxyImpl.java | 60 -----
.../processors/cache/GridNoStorageCacheMap.java | 7 +-
.../processors/cache/IgniteInternalCache.java | 61 -----
.../dht/GridCachePartitionedConcurrentMap.java | 13 +-
.../distributed/dht/GridDhtCacheAdapter.java | 162 -------------
.../distributed/dht/GridDhtLocalPartition.java | 4 +-
.../dht/GridDhtPartitionTopologyImpl.java | 2 +-
.../distributed/near/GridNearCacheAdapter.java | 17 --
.../processors/cache/GridCacheLeakTest.java | 4 +-
.../GridCachePartitionedFullApiSelfTest.java | 18 --
.../cache/eviction/EvictionAbstractTest.java | 13 +-
.../IgniteCacheClientNearCacheExpiryTest.java | 2 +-
.../hadoop/jobtracker/HadoopJobTracker.java | 20 +-
.../cache/hibernate/HibernateCacheProxy.java | 25 --
.../apache/ignite/ml/math/util/MatrixUtil.java | 3 +-
.../org/apache/ignite/ml/IgniteMLTestSuite.java | 35 +++
.../ml/math/MathImplDistributedTestSuite.java | 2 +-
.../ignite/ml/math/MathImplLocalTestSuite.java | 7 +-
.../ignite/ml/math/MathImplMainTestSuite.java | 2 +-
.../OLSMultipleLinearRegressionTest.java | 7 +
.../ml/regressions/RegressionsTestSuite.java | 32 +++
.../Binary/BinarySelfTest.cs | 26 ++
.../Impl/Binary/BinaryReflectiveActions.cs | 6 +-
.../BinaryReflectiveSerializerInternal.cs | 9 +-
.../Impl/Binary/BinaryUtils.cs | 9 -
.../Binary/DeserializationCallbackProcessor.cs | 11 +
.../Impl/Binary/SerializableSerializer.cs | 10 +-
.../activities-user-dialog.tpl.pug | 33 +--
.../components/web-console-header/style.scss | 9 -
.../frontend/app/primitives/btn/index.scss | 235 +++++++++++++++++++
.../frontend/app/primitives/index.js | 2 +
.../frontend/app/primitives/modal/index.scss | 179 ++++++++++++++
.../frontend/app/primitives/table/index.scss | 91 +++++++
.../frontend/public/stylesheets/style.scss | 99 ++------
.../frontend/public/stylesheets/variables.scss | 4 +
.../frontend/views/includes/header-right.pug | 4 +-
.../web-console/frontend/views/signin.tpl.pug | 2 +-
41 files changed, 864 insertions(+), 618 deletions(-)
----------------------------------------------------------------------