You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/05/27 04:12:26 UTC
[6/7] accumulo git commit: ACCUMULO-3853 Explicitly track sendQueue
size
ACCUMULO-3853 Explicitly track sendQueue size
size() on ConcurrentLinkedQueue is a linear operation
on the number of nodes in the queue which stinks. Keep
an explicit count on the size to avoid this.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/caef59e4
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/caef59e4
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/caef59e4
Branch: refs/heads/1.7
Commit: caef59e4a61f23be5de5d609a9bc8f2dba8bde57
Parents: 5af27c2
Author: Josh Elser <jo...@gmail.com>
Authored: Tue May 26 18:42:34 2015 -0400
Committer: Josh Elser <jo...@gmail.com>
Committed: Tue May 26 21:00:12 2015 -0400
----------------------------------------------------------------------
.../org/apache/accumulo/tracer/AsyncSpanReceiver.java | 11 ++++++++---
1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/caef59e4/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java
----------------------------------------------------------------------
diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java
index 9b8705a..d3a2fc5 100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java
@@ -16,6 +16,8 @@
*/
package org.apache.accumulo.tracer;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.AbstractQueue;
@@ -27,6 +29,7 @@ import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.trace.DistributedTrace;
import org.apache.accumulo.tracer.thrift.Annotation;
@@ -38,8 +41,6 @@ import org.apache.htrace.TimelineAnnotation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
/**
* Deliver Span information periodically to a destination.
* <ul>
@@ -69,6 +70,7 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> implements SpanRece
Timer timer = new Timer("SpanSender", true);
protected final AbstractQueue<RemoteSpan> sendQueue = new ConcurrentLinkedQueue<RemoteSpan>();
+ protected final AtomicInteger sendQueueSize = new AtomicInteger(0);
int maxQueueSize = 5000;
long lastNotificationOfDroppedSpans = 0;
int minSpanSize = 1;
@@ -112,6 +114,7 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> implements SpanRece
sendQueue.remove();
sendQueue.notifyAll();
}
+ sendQueueSize.decrementAndGet();
continue;
}
SpanKey dest = getSpanKey(s.data);
@@ -130,6 +133,7 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> implements SpanRece
sendQueue.remove();
sendQueue.notifyAll();
}
+ sendQueueSize.decrementAndGet();
sent = true;
} catch (Exception ex) {
log.warn("Got error sending to " + dest + ", refreshing client", ex);
@@ -168,7 +172,7 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> implements SpanRece
SpanKey dest = getSpanKey(data);
if (dest != null) {
List<Annotation> annotations = convertToAnnotations(s.getTimelineAnnotations());
- if (sendQueue.size() > maxQueueSize) {
+ if (sendQueueSize.get() > maxQueueSize) {
long now = System.currentTimeMillis();
if (now - lastNotificationOfDroppedSpans > 60 * 1000) {
log.warn("Tracing spans are being dropped because there are already " + maxQueueSize + " spans queued for delivery.\n"
@@ -179,6 +183,7 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> implements SpanRece
}
sendQueue.add(new RemoteSpan(host, service == null ? s.getProcessId() : service, s.getTraceId(), s.getSpanId(), s.getParentId(), s.getStartTimeMillis(),
s.getStopTimeMillis(), s.getDescription(), data, annotations));
+ sendQueueSize.incrementAndGet();
}
}