You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bi...@apache.org on 2015/05/29 22:40:31 UTC
accumulo git commit: ACCUMULO-3862 improved how AsyncSpanReceiver
drops short spans, added test for min span length
Repository: accumulo
Updated Branches:
refs/heads/1.7 61bfbb22e -> 0eef354c5
ACCUMULO-3862 improved how AsyncSpanReceiver drops short spans, added test for min span length
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0eef354c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0eef354c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0eef354c
Branch: refs/heads/1.7
Commit: 0eef354c56e9ef2788c72f4dbaac596653e25bbe
Parents: 61bfbb2
Author: Billie Rinaldi <bi...@apache.org>
Authored: Fri May 29 13:29:32 2015 -0700
Committer: Billie Rinaldi <bi...@apache.org>
Committed: Fri May 29 13:29:40 2015 -0700
----------------------------------------------------------------------
.../accumulo/tracer/AsyncSpanReceiver.java | 14 +-
.../accumulo/tracer/AsyncSpanReceiverTest.java | 129 +++++++++++++++++++
2 files changed, 134 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0eef354c/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java
----------------------------------------------------------------------
diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java
index 28a9088..a35734d 100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java
@@ -55,7 +55,7 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> implements SpanRece
public static final String SEND_TIMER_MILLIS = "tracer.send.timer.millis";
public static final String QUEUE_SIZE = "tracer.queue.size";
- private static final String SPAN_MIN_MS = "tracer.span.min.ms";
+ public static final String SPAN_MIN_MS = "tracer.span.min.ms";
private final Map<SpanKey,Destination> clients = new HashMap<SpanKey,Destination>();
@@ -109,14 +109,6 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> implements SpanRece
while (!sendQueue.isEmpty()) {
boolean sent = false;
RemoteSpan s = sendQueue.peek();
- if (s.stop - s.start < minSpanSize) {
- synchronized (sendQueue) {
- sendQueue.remove();
- sendQueue.notifyAll();
- sendQueueSize.decrementAndGet();
- }
- continue;
- }
SpanKey dest = getSpanKey(s.data);
Destination client = clients.get(dest);
if (client == null) {
@@ -167,6 +159,10 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> implements SpanRece
@Override
public void receiveSpan(Span s) {
+ if (s.getStopTimeMillis() - s.getStartTimeMillis() < minSpanSize) {
+ return;
+ }
+
Map<String,String> data = convertToStrings(s.getKVAnnotations());
SpanKey dest = getSpanKey(data);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0eef354c/server/tracer/src/test/java/org/apache/accumulo/tracer/AsyncSpanReceiverTest.java
----------------------------------------------------------------------
diff --git a/server/tracer/src/test/java/org/apache/accumulo/tracer/AsyncSpanReceiverTest.java b/server/tracer/src/test/java/org/apache/accumulo/tracer/AsyncSpanReceiverTest.java
new file mode 100644
index 0000000..6744efc
--- /dev/null
+++ b/server/tracer/src/test/java/org/apache/accumulo/tracer/AsyncSpanReceiverTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tracer;
+
+import org.apache.accumulo.tracer.thrift.RemoteSpan;
+import org.apache.htrace.HTraceConfiguration;
+import org.apache.htrace.Span;
+import org.apache.htrace.impl.MilliSpan;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+
+public class AsyncSpanReceiverTest {
+ static class TestReceiver extends AsyncSpanReceiver<String,String> {
+ AtomicInteger spansSent = new AtomicInteger(0);
+
+ TestReceiver() {
+ super(HTraceConfiguration.EMPTY);
+ }
+
+ TestReceiver(HTraceConfiguration conf) {
+ super(conf);
+ }
+
+ @Override
+ protected String createDestination(String o) throws Exception {
+ return "DEST";
+ }
+
+ @Override
+ protected void send(String resource, RemoteSpan span) throws Exception {
+ spansSent.incrementAndGet();
+ }
+
+ @Override
+ protected String getSpanKey(Map data) {
+ return "DEST";
+ }
+
+ int getSpansSent() {
+ return spansSent.get();
+ }
+
+ int getQueueSize() {
+ return sendQueueSize.get();
+ }
+ }
+
+ Span createSpan(long length) {
+ long time = System.currentTimeMillis();
+ Span span = new MilliSpan.Builder().begin(time).end(time + length).description("desc").parents(Collections.<Long> emptyList()).spanId(1).traceId(2).build();
+ return span;
+ }
+
+ @Test
+ public void test() throws InterruptedException {
+ TestReceiver receiver = new TestReceiver();
+
+ receiver.receiveSpan(createSpan(0));
+ while (receiver.getQueueSize() > 0) {
+ Thread.sleep(500);
+ }
+ assertEquals(0, receiver.getQueueSize());
+ assertEquals(0, receiver.getSpansSent());
+
+ receiver.receiveSpan(createSpan(1));
+ while (receiver.getQueueSize() > 0) {
+ Thread.sleep(500);
+ }
+ assertEquals(0, receiver.getQueueSize());
+ assertEquals(1, receiver.getSpansSent());
+ }
+
+ @Test
+ public void testKeepAll() throws InterruptedException {
+ TestReceiver receiver = new TestReceiver(HTraceConfiguration.fromMap(Collections.singletonMap(AsyncSpanReceiver.SPAN_MIN_MS, "0")));
+
+ receiver.receiveSpan(createSpan(0));
+ while (receiver.getQueueSize() > 0) {
+ Thread.sleep(500);
+ }
+ assertEquals(0, receiver.getQueueSize());
+ assertEquals(1, receiver.getSpansSent());
+ }
+
+ @Test
+ public void testExcludeMore() throws InterruptedException {
+ TestReceiver receiver = new TestReceiver(HTraceConfiguration.fromMap(Collections.singletonMap(AsyncSpanReceiver.SPAN_MIN_MS, "10")));
+
+ receiver.receiveSpan(createSpan(0));
+ while (receiver.getQueueSize() > 0) {
+ Thread.sleep(500);
+ }
+ assertEquals(0, receiver.getQueueSize());
+ assertEquals(0, receiver.getSpansSent());
+
+ receiver.receiveSpan(createSpan(9));
+ while (receiver.getQueueSize() > 0) {
+ Thread.sleep(500);
+ }
+ assertEquals(0, receiver.getQueueSize());
+ assertEquals(0, receiver.getSpansSent());
+
+ receiver.receiveSpan(createSpan(10));
+ while (receiver.getQueueSize() > 0) {
+ Thread.sleep(500);
+ }
+ assertEquals(0, receiver.getQueueSize());
+ assertEquals(1, receiver.getSpansSent());
+ }
+}