You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/24 13:36:21 UTC
[16/31] ignite git commit: Results printout for IO latency test and
new metrics
Results printout for IO latency test and new metrics
(cherry picked)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/018b25b2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/018b25b2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/018b25b2
Branch: refs/heads/ignite-5075
Commit: 018b25b29c3c491db7e44963e8c79677d77ceb23
Parents: 6f1dc3a
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Tue May 23 17:39:37 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Tue May 23 17:44:33 2017 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 380 +++++++++++++++----
.../communication/IgniteIoTestMessage.java | 362 +++++++++++++++++-
2 files changed, 672 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/018b25b2/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 147f94d..68bfd07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -18,9 +18,11 @@
package org.apache.ignite.internal.managers.communication;
import java.io.Serializable;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -64,12 +66,12 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.apache.ignite.internal.processors.pool.PoolProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
-import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridTuple3;
+import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
@@ -341,10 +343,15 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg;
+ msg0.senderNodeId(nodeId);
+
if (msg0.request()) {
IgniteIoTestMessage res = new IgniteIoTestMessage(msg0.id(), false, null);
res.flags(msg0.flags());
+ res.onRequestProcessed();
+
+ res.copyDataFromRequest(msg0);
try {
sendToGridTopic(node, GridTopic.TOPIC_IO_TEST, res, GridIoPolicy.SYSTEM_POOL);
@@ -356,10 +363,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
else {
IoTestFuture fut = ioTestMap().get(msg0.id());
+ msg0.onResponseProcessed();
+
if (fut == null)
U.warn(log, "Failed to find IO test future [msg=" + msg0 + ']');
else
- fut.onResponse();
+ fut.onResponse(msg0);
}
}
});
@@ -404,7 +413,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param procFromNioThread If {@code true} message is processed from NIO thread.
* @return Response future.
*/
- public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean procFromNioThread) {
+ public IgniteInternalFuture<List<IgniteIoTestMessage>> sendIoTest(
+ ClusterNode node,
+ byte[] payload,
+ boolean procFromNioThread
+ ) {
long id = ioTestId.getAndIncrement();
IoTestFuture fut = new IoTestFuture(id, 1);
@@ -445,7 +458,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param warmup Warmup duration in milliseconds.
* @param duration Test duration in milliseconds.
* @param threads Thread count.
- * @param maxLatency Max latency in nanoseconds.
+ * @param latencyLimit Max latency in nanoseconds.
* @param rangesCnt Ranges count in resulting histogram.
* @param payLoadSize Payload size in bytes.
* @param procFromNioThread {@code True} to process requests in NIO threads.
@@ -455,7 +468,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
final long warmup,
final long duration,
final int threads,
- final long maxLatency,
+ final long latencyLimit,
final int rangesCnt,
final int payLoadSize,
final boolean procFromNioThread,
@@ -469,8 +482,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
final LongAdder8 cnt = new LongAdder8();
final long sleepDuration = 5000;
final byte[] payLoad = new byte[payLoadSize];
- final Map<UUID, long[]>[] res = new Map[threads];
- final ConcurrentMap<UUID, GridAtomicLong> maxLatencies = new ConcurrentHashMap8<>();
+ final Map<UUID, IoTestThreadLocalNodeResults>[] res = new Map[threads];
boolean failed = true;
@@ -489,7 +501,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
"[warmup=" + warmup +
", duration=" + duration +
", threads=" + threads +
- ", maxLatency=" + maxLatency +
+ ", latencyLimit=" + latencyLimit +
", rangesCnt=" + rangesCnt +
", payLoadSize=" + payLoadSize +
", procFromNioThreads=" + procFromNioThread + ']'
@@ -529,22 +541,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
// At this point all threads have finished the test and
// stored data to the resulting array of maps.
// Need to iterate it over and sum values for all threads.
- Map<UUID, long[]> res0 = new HashMap<>();
-
- for (Map<UUID, long[]> r : res) {
- for (Entry<UUID, long[]> e : r.entrySet()) {
- long[] r0 = res0.get(e.getKey());
-
- if (r0 == null)
- res0.put(e.getKey(), e.getValue());
- else {
- for (int i = 0; i < rangesCnt + 1; i++)
- r0[i] += e.getValue()[i];
- }
- }
- }
-
- printIoTestResults(maxLatency / (1000 * rangesCnt), res0, maxLatencies);
+ printIoTestResults(res);
}
catch (InterruptedException | BrokenBarrierException e) {
U.error(log, "IO test failed.", e);
@@ -566,7 +563,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
boolean failed = true;
ThreadLocalRandom rnd = ThreadLocalRandom.current();
int size = nodes.size();
- Map<UUID, long[]> res0 = res[i0];
+ Map<UUID, IoTestThreadLocalNodeResults> res0 = res[i0];
try {
boolean warmupFinished0 = false;
@@ -582,38 +579,22 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
ClusterNode node = nodes.get(rnd.nextInt(size));
- long start = System.nanoTime();
-
- sendIoTest(node, payLoad, procFromNioThread).get();
-
- long latency = System.nanoTime() - start;
+ List<IgniteIoTestMessage> msgs = sendIoTest(node, payLoad, procFromNioThread).get();
cnt.increment();
- long[] latencies = res0.get(node.id());
-
- if (latencies == null)
- res0.put(node.id(), latencies = new long[rangesCnt + 1]);
-
- if (latency >= maxLatency) {
- latencies[rangesCnt]++; // Timed out.
+ for (IgniteIoTestMessage msg : msgs) {
+ UUID nodeId = msg.senderNodeId();
- GridAtomicLong maxLatency = maxLatencies.get(node.id());
+ assert nodeId != null;
- if (maxLatency == null) {
- GridAtomicLong old = maxLatencies.putIfAbsent(node.id(),
- maxLatency = new GridAtomicLong());
+ IoTestThreadLocalNodeResults nodeRes = res0.get(nodeId);
- if (old != null)
- maxLatency = old;
- }
+ if (nodeRes == null)
+ res0.put(nodeId,
+ nodeRes = new IoTestThreadLocalNodeResults(rangesCnt, latencyLimit));
- maxLatency.setIfGreater(latency);
- }
- else {
- int idx = (int)Math.floor((1.0 * latency) / ((1.0 * maxLatency) / rangesCnt));
-
- latencies[idx]++;
+ nodeRes.onResult(msg);
}
}
@@ -641,30 +622,44 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
/**
- * @param binLatencyMcs Bin latency in microseconds.
- * @param res Resulting map.
- * @param maxLatencies Max latency for each node.
+ * @param rawRes Resulting map.
*/
private void printIoTestResults(
- long binLatencyMcs,
- Map<UUID, long[]> res,
- ConcurrentMap<UUID, GridAtomicLong> maxLatencies
+ Map<UUID, IoTestThreadLocalNodeResults>[] rawRes
) {
+ Map<UUID, IoTestNodeResults> res = new HashMap<>();
+
+ for (Map<UUID, IoTestThreadLocalNodeResults> r : rawRes) {
+ for (Entry<UUID, IoTestThreadLocalNodeResults> e : r.entrySet()) {
+ IoTestNodeResults r0 = res.get(e.getKey());
+
+ if (r0 == null)
+ res.put(e.getKey(), r0 = new IoTestNodeResults());
+
+ r0.add(e.getValue());
+ }
+ }
+
+ SimpleDateFormat dateFmt = new SimpleDateFormat("HH:mm:ss,SSS");
+
StringBuilder b = new StringBuilder(U.nl())
- .append("IO test results (round-trip count per each latency bin) " +
- "[binLatency=" + binLatencyMcs + "mcs]")
+ .append("IO test results (round-trip count per each latency bin).")
.append(U.nl());
- for (Entry<UUID, long[]> e : res.entrySet()) {
+ for (Entry<UUID, IoTestNodeResults> e : res.entrySet()) {
ClusterNode node = ctx.discovery().node(e.getKey());
+ long binLatencyMcs = e.getValue().binLatencyMcs();
+
b.append("Node ID: ").append(e.getKey()).append(" (addrs=")
- .append(node != null ? node.addresses().toString() : "n/a").append(')').append(U.nl());
+ .append(node != null ? node.addresses().toString() : "n/a")
+ .append(", binLatency=").append(binLatencyMcs).append("mcs")
+ .append(')').append(U.nl());
b.append("Latency bin, mcs | Count exclusive | Percentage exclusive | " +
"Count inclusive | Percentage inclusive ").append(U.nl());
- long[] nodeRes = e.getValue();
+ long[] nodeRes = e.getValue().resLatency;
long sum = 0;
@@ -688,15 +683,49 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
curSum, (100.0 * curSum) / sum));
}
- GridAtomicLong maxLatency = maxLatencies.get(e.getKey());
+ b.append(U.nl()).append("Total latency (ns): ").append(U.nl())
+ .append(String.format("%15d", e.getValue().totalLatency)).append(U.nl());
+
+ b.append(U.nl()).append("Max latencies (ns):").append(U.nl());
+ format(b, e.getValue().maxLatency, dateFmt);
+
+ b.append(U.nl()).append("Max request send queue times (ns):").append(U.nl());
+ format(b, e.getValue().maxReqSendQueueTime, dateFmt);
+
+ b.append(U.nl()).append("Max request receive queue times (ns):").append(U.nl());
+ format(b, e.getValue().maxReqRcvQueueTime, dateFmt);
+
+ b.append(U.nl()).append("Max response send queue times (ns):").append(U.nl());
+ format(b, e.getValue().maxResSendQueueTime, dateFmt);
+
+ b.append(U.nl()).append("Max response receive queue times (ns):").append(U.nl());
+ format(b, e.getValue().maxResRcvQueueTime, dateFmt);
- b.append("Max latency (ns): ").append(maxLatency != null ? maxLatency.get() : -1).append(U.nl());
+ b.append(U.nl()).append("Max request wire times (millis):").append(U.nl());
+ format(b, e.getValue().maxReqWireTimeMillis, dateFmt);
+
+ b.append(U.nl()).append("Max response wire times (millis):").append(U.nl());
+ format(b, e.getValue().maxResWireTimeMillis, dateFmt);
+
+ b.append(U.nl());
}
if (log.isInfoEnabled())
log.info(b.toString());
}
+ /**
+ * @param b Builder.
+ * @param pairs Pairs to format.
+ * @param dateFmt Formatter.
+ */
+ private void format(StringBuilder b, Collection<IgnitePair<Long>> pairs, SimpleDateFormat dateFmt) {
+ for (IgnitePair<Long> p : pairs) {
+ b.append(String.format("%15d", p.get1())).append(" ")
+ .append(dateFmt.format(new Date(p.get2()))).append(U.nl());
+ }
+ }
+
/** {@inheritDoc} */
@SuppressWarnings({"deprecation", "SynchronizationOnLocalVariableOrMethodParameter"})
@Override public void onKernalStart0() throws IgniteCheckedException {
@@ -2857,12 +2886,15 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
/**
*
*/
- private class IoTestFuture extends GridFutureAdapter<Object> {
+ private class IoTestFuture extends GridFutureAdapter<List<IgniteIoTestMessage>> {
/** */
private final long id;
/** */
- private int cntr;
+ private final int cntr;
+
+ /** */
+ private final List<IgniteIoTestMessage> ress;
/**
* @param id ID.
@@ -2873,24 +2905,28 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
this.id = id;
this.cntr = cntr;
+
+ ress = new ArrayList<>(cntr);
}
/**
*
*/
- void onResponse() {
+ void onResponse(IgniteIoTestMessage res) {
boolean complete;
synchronized (this) {
- complete = --cntr == 0;
+ ress.add(res);
+
+ complete = cntr == ress.size();
}
if (complete)
- onDone();
+ onDone(ress);
}
/** {@inheritDoc} */
- @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
+ @Override public boolean onDone(List<IgniteIoTestMessage> res, @Nullable Throwable err) {
if (super.onDone(res, err)) {
ioTestMap().remove(id);
@@ -2905,4 +2941,210 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return S.toString(IoTestFuture.class, this);
}
}
+
+ /**
+ *
+ */
+ private static class IoTestThreadLocalNodeResults {
+ /** */
+ private final long[] resLatency;
+
+ /** */
+ private final int rangesCnt;
+
+ /** */
+ private long totalLatency;
+
+ /** */
+ private long maxLatency;
+
+ /** */
+ private long maxLatencyTs;
+
+ /** */
+ private long maxReqSendQueueTime;
+
+ /** */
+ private long maxReqSendQueueTimeTs;
+
+ /** */
+ private long maxReqRcvQueueTime;
+
+ /** */
+ private long maxReqRcvQueueTimeTs;
+
+ /** */
+ private long maxResSendQueueTime;
+
+ /** */
+ private long maxResSendQueueTimeTs;
+
+ /** */
+ private long maxResRcvQueueTime;
+
+ /** */
+ private long maxResRcvQueueTimeTs;
+
+ /** */
+ private long maxReqWireTimeMillis;
+
+ /** */
+ private long maxReqWireTimeTs;
+
+ /** */
+ private long maxResWireTimeMillis;
+
+ /** */
+ private long maxResWireTimeTs;
+
+ /** */
+ private final long latencyLimit;
+
+ /**
+ * @param rangesCnt Ranges count.
+ * @param latencyLimit
+ */
+ public IoTestThreadLocalNodeResults(int rangesCnt, long latencyLimit) {
+ this.rangesCnt = rangesCnt;
+ this.latencyLimit = latencyLimit;
+
+ resLatency = new long[rangesCnt + 1];
+ }
+
+ /**
+ * @param msg
+ */
+ public void onResult(IgniteIoTestMessage msg) {
+ long now = System.currentTimeMillis();
+
+ long latency = msg.responseProcessedTs() - msg.requestCreateTs();
+
+ int idx = latency >= latencyLimit ?
+ rangesCnt /* Timed out. */ :
+ (int)Math.floor((1.0 * latency) / ((1.0 * latencyLimit) / rangesCnt));
+
+ resLatency[idx]++;
+
+ totalLatency += latency;
+
+ if (maxLatency < latency) {
+ maxLatency = latency;
+ maxLatencyTs = now;
+ }
+
+ long reqSndQueueTime = msg.requestSendTs() - msg.requestCreateTs();
+
+ if (maxReqSendQueueTime < reqSndQueueTime) {
+ maxReqSendQueueTime = reqSndQueueTime;
+ maxReqSendQueueTimeTs = now;
+ }
+
+ long reqRcvQueueTime = msg.requestProcessTs() - msg.requestReceiveTs();
+
+ if (maxReqRcvQueueTime < reqRcvQueueTime) {
+ maxReqRcvQueueTime = reqRcvQueueTime;
+ maxReqRcvQueueTimeTs = now;
+ }
+
+ long resSndQueueTime = msg.responseSendTs() - msg.requestProcessTs();
+
+ if (maxResSendQueueTime < resSndQueueTime) {
+ maxResSendQueueTime = resSndQueueTime;
+ maxResSendQueueTimeTs = now;
+ }
+
+ long resRcvQueueTime = msg.responseProcessedTs() - msg.responseReceiveTs();
+
+ if (maxResRcvQueueTime < resRcvQueueTime) {
+ maxResRcvQueueTime = resRcvQueueTime;
+ maxResRcvQueueTimeTs = now;
+ }
+
+ long reqWireTimeMillis = msg.requestReceivedTsMillis() - msg.requestSendTsMillis();
+
+ if (maxReqWireTimeMillis < reqWireTimeMillis) {
+ maxReqWireTimeMillis = reqWireTimeMillis;
+ maxReqWireTimeTs = now;
+ }
+
+ long resWireTimeMillis = msg.responseRecievedTsMillis() - msg.requestSendTsMillis();
+
+ if (maxResWireTimeMillis < resWireTimeMillis) {
+ maxResWireTimeMillis = resWireTimeMillis;
+ maxResWireTimeTs = now;
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static class IoTestNodeResults {
+ /** */
+ private long latencyLimit;
+
+ /** */
+ private long[] resLatency;
+
+ /** */
+ private long totalLatency;
+
+ /** */
+ private Collection<IgnitePair<Long>> maxLatency = new ArrayList<>();
+
+ /** */
+ private Collection<IgnitePair<Long>> maxReqSendQueueTime = new ArrayList<>();
+
+ /** */
+ private Collection<IgnitePair<Long>> maxReqRcvQueueTime = new ArrayList<>();
+
+ /** */
+ private Collection<IgnitePair<Long>> maxResSendQueueTime = new ArrayList<>();
+
+ /** */
+ private Collection<IgnitePair<Long>> maxResRcvQueueTime = new ArrayList<>();
+
+ /** */
+ private Collection<IgnitePair<Long>> maxReqWireTimeMillis = new ArrayList<>();
+
+ /** */
+ private Collection<IgnitePair<Long>> maxResWireTimeMillis = new ArrayList<>();
+
+ /**
+ * @param res Node results to add.
+ */
+ public void add(IoTestThreadLocalNodeResults res) {
+ if (resLatency == null) {
+ resLatency = res.resLatency.clone();
+ latencyLimit = res.latencyLimit;
+ }
+ else {
+ assert latencyLimit == res.latencyLimit;
+ assert resLatency.length == res.resLatency.length;
+
+ for (int i = 0; i < resLatency.length; i++)
+ resLatency[i] += res.resLatency[i];
+ }
+
+ totalLatency += res.totalLatency;
+
+ maxLatency.add(F.pair(res.maxLatency, res.maxLatencyTs));
+ maxReqSendQueueTime.add(F.pair(res.maxReqSendQueueTime, res.maxReqSendQueueTimeTs));
+ maxReqRcvQueueTime.add(F.pair(res.maxReqRcvQueueTime, res.maxReqRcvQueueTimeTs));
+ maxResSendQueueTime.add(F.pair(res.maxResSendQueueTime, res.maxResSendQueueTimeTs));
+ maxResRcvQueueTime.add(F.pair(res.maxResRcvQueueTime, res.maxResRcvQueueTimeTs));
+ maxReqWireTimeMillis.add(F.pair(res.maxReqWireTimeMillis, res.maxReqWireTimeTs));
+ maxResWireTimeMillis.add(F.pair(res.maxResWireTimeMillis, res.maxResWireTimeTs));
+ }
+
+ /**
+ * @return Bin latency in microseconds.
+ */
+ public long binLatencyMcs() {
+ if (resLatency == null)
+ throw new IllegalStateException();
+
+ return latencyLimit / (1000 * (resLatency.length - 1));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/018b25b2/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
index 0a41622..3e0fa76 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.managers.communication;
import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -45,6 +47,43 @@ public class IgniteIoTestMessage implements Message {
/** */
private byte payload[];
+ /** */
+ private long reqCreateTs;
+
+ /** */
+ private long reqSndTs;
+
+ /** */
+ private long reqSndTsMillis;
+
+ /** */
+ private long reqRcvTs;
+
+ /** */
+ private long reqRcvTsMillis;
+
+ /** */
+ private long reqProcTs;
+
+ /** */
+ private long resSndTs;
+
+ /** */
+ private long resSndTsMillis;
+
+ /** */
+ private long resRcvTs;
+
+ /** */
+ private long resRcvTsMillis;
+
+ /** */
+ private long resProcTs;
+
+ /** */
+ @GridDirectTransient
+ private UUID sndNodeId;
+
/**
*
*/
@@ -61,6 +100,8 @@ public class IgniteIoTestMessage implements Message {
this.id = id;
this.req = req;
this.payload = payload;
+
+ reqCreateTs = System.nanoTime();
}
/**
@@ -126,10 +167,173 @@ public class IgniteIoTestMessage implements Message {
return id;
}
+ /**
+ * @return Request create timestamp.
+ */
+ public long requestCreateTs() {
+ return reqCreateTs;
+ }
+
+ /**
+ * @return Request send timestamp.
+ */
+ public long requestSendTs() {
+ return reqSndTs;
+ }
+
+ /**
+ * @return Request receive timestamp.
+ */
+ public long requestReceiveTs() {
+ return reqRcvTs;
+ }
+
+ /**
+ * @return Request process started timestamp.
+ */
+ public long requestProcessTs() {
+ return reqProcTs;
+ }
+
+ /**
+ * @return Response send timestamp.
+ */
+ public long responseSendTs() {
+ return resSndTs;
+ }
+
+ /**
+ * @return Response receive timestamp.
+ */
+ public long responseReceiveTs() {
+ return resRcvTs;
+ }
+
+ /**
+ * @return Response process timestamp.
+ */
+ public long responseProcessTs() {
+ return resProcTs;
+ }
+
+ /**
+ * @return Request send timestamp (millis).
+ */
+ public long requestSendTsMillis() {
+ return reqSndTsMillis;
+ }
+
+ /**
+ * @return Request received timestamp (millis).
+ */
+ public long requestReceivedTsMillis() {
+ return reqRcvTsMillis;
+ }
+
+ /**
+ * @return Response send timestamp (millis).
+ */
+ public long responseSendTsMillis() {
+ return resSndTsMillis;
+ }
+
+ /**
+ * @return Response received timestamp (millis).
+ */
+ public long responseRecievedTsMillis() {
+ return resRcvTsMillis;
+ }
+
+ /**
+ * This method is called to initialize tracing variables.
+ * TODO: introduce direct message lifecycle API?
+ */
+ public void onAfterRead() {
+ if (req && reqRcvTs == 0) {
+ reqRcvTs = System.nanoTime();
+
+ reqRcvTsMillis = System.currentTimeMillis();
+ }
+
+ if (!req && resRcvTs == 0) {
+ resRcvTs = System.nanoTime();
+
+ resRcvTsMillis = System.currentTimeMillis();
+ }
+ }
+
+ /**
+ * This method is called to initialize tracing variables.
+ * TODO: introduce direct message lifecycle API?
+ */
+ public void onBeforeWrite() {
+ if (req && reqSndTs == 0) {
+ reqSndTs = System.nanoTime();
+
+ reqSndTsMillis = System.currentTimeMillis();
+ }
+
+ if (!req && resSndTs == 0) {
+ resSndTs = System.nanoTime();
+
+ resSndTsMillis = System.currentTimeMillis();
+ }
+ }
+
+ /**
+ *
+ */
+ public void copyDataFromRequest(IgniteIoTestMessage req) {
+ reqCreateTs = req.reqCreateTs;
+
+ reqSndTs = req.reqSndTs;
+ reqSndTsMillis = req.reqSndTsMillis;
+
+ reqRcvTs = req.reqRcvTs;
+ reqRcvTsMillis = req.reqRcvTsMillis;
+ }
+
+ /**
+ *
+ */
+ public void onRequestProcessed() {
+ reqProcTs = System.nanoTime();
+ }
+
+ /**
+ *
+ */
+ public void onResponseProcessed() {
+ resProcTs = System.nanoTime();
+ }
+
+ /**
+ * @return Response processed timestamp.
+ */
+ public long responseProcessedTs() {
+ return resProcTs;
+ }
+
+ /**
+ * @return Sender node ID.
+ */
+ public UUID senderNodeId() {
+ return sndNodeId;
+ }
+
+ /**
+ * @param sndNodeId Sender node ID.
+ */
+ public void senderNodeId(UUID sndNodeId) {
+ this.sndNodeId = sndNodeId;
+ }
+
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
+ onBeforeWrite();
+
if (!writer.isHeaderWritten()) {
if (!writer.writeHeader(directType(), fieldsCount()))
return false;
@@ -162,6 +366,72 @@ public class IgniteIoTestMessage implements Message {
writer.incrementState();
+ case 4:
+ if (!writer.writeLong("reqCreateTs", reqCreateTs))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeLong("reqProcTs", reqProcTs))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeLong("reqRcvTs", reqRcvTs))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeLong("reqRcvTsMillis", reqRcvTsMillis))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeLong("reqSndTs", reqSndTs))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeLong("reqSndTsMillis", reqSndTsMillis))
+ return false;
+
+ writer.incrementState();
+
+ case 10:
+ if (!writer.writeLong("resProcTs", resProcTs))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
+ if (!writer.writeLong("resRcvTs", resRcvTs))
+ return false;
+
+ writer.incrementState();
+
+ case 12:
+ if (!writer.writeLong("resRcvTsMillis", resRcvTsMillis))
+ return false;
+
+ writer.incrementState();
+
+ case 13:
+ if (!writer.writeLong("resSndTs", resSndTs))
+ return false;
+
+ writer.incrementState();
+
+ case 14:
+ if (!writer.writeLong("resSndTsMillis", resSndTsMillis))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -207,8 +477,98 @@ public class IgniteIoTestMessage implements Message {
reader.incrementState();
+ case 4:
+ reqCreateTs = reader.readLong("reqCreateTs");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ reqProcTs = reader.readLong("reqProcTs");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ reqRcvTs = reader.readLong("reqRcvTs");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ reqRcvTsMillis = reader.readLong("reqRcvTsMillis");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
+ reqSndTs = reader.readLong("reqSndTs");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ reqSndTsMillis = reader.readLong("reqSndTsMillis");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 10:
+ resProcTs = reader.readLong("resProcTs");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
+ resRcvTs = reader.readLong("resRcvTs");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 12:
+ resRcvTsMillis = reader.readLong("resRcvTsMillis");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 13:
+ resSndTs = reader.readLong("resSndTs");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 14:
+ resSndTsMillis = reader.readLong("resSndTsMillis");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
+ onAfterRead();
+
return reader.afterMessageRead(IgniteIoTestMessage.class);
}
@@ -219,7 +579,7 @@ public class IgniteIoTestMessage implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 4;
+ return 15;
}
/** {@inheritDoc} */