You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ph...@apache.org on 2010/08/13 22:58:14 UTC
svn commit: r985363 - in /avro/trunk: ./
lang/java/src/java/org/apache/avro/ipc/trace/
lang/java/src/test/java/org/apache/avro/ipc/trace/
share/schemas/org/apache/avro/ipc/trace/
Author: philz
Date: Fri Aug 13 20:58:13 2010
New Revision: 985363
URL: http://svn.apache.org/viewvc?rev=985363&view=rev
Log:
AVRO-606. Add File-Based Span Storage to TracePlugin (Contributed by Patrick Wendell)
Added:
avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/FileSpanStorage.java
avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestFileSpanStorage.java
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java
avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java
avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avdl
avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avpr
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=985363&r1=985362&r2=985363&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Aug 13 20:58:13 2010
@@ -21,6 +21,9 @@ Avro 1.4.0 (unreleased)
NEW FEATURES
+ AVRO-606. Add File-Based Span Storage to TracePlugin
+ (Patrick Wendell via philz)
+
AVRO-595. Add Basic Trace Collection and Propagation.
(Patrick Wendell via philz)
Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/FileSpanStorage.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/FileSpanStorage.java?rev=985363&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/FileSpanStorage.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/FileSpanStorage.java Fri Aug 13 20:58:13 2010
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc.trace;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A file-based { @link SpanStorage } implementation for Avro's
+ * { @link TracePlugin }. This class has two modes, one in which writes are
+ * buffered and one in which they are not. Even without buffering, there will be
+ * some delay between reporting of a Span and the actual disk write.
+ */
+public class FileSpanStorage implements SpanStorage {
+ /*
+ * We use rolling Avro data files that store Span data associated with ten
+ * minute chunks (this provides a simple way to index on time). Because we
+ * enforce an upper limit on the number of spans stored, we simply drop
+ * oldest file if and when the next write causes us to exceed that limit. This
+ * approximates a FIFO queue of spans, which is basically what we want to
+ * maintain.
+ *
+ * Focus is on efficiency since most logic occurs every
+ * time a span is recorded (that is, every RPC call).
+ *
+ * We never want to block on span adding operations, which occur in the same
+ * thread as the Requestor. We are okay to block on span retrieving
+ * operations, since they typically run the RPCPlugin's own servlet. To
+ * avoid blocking on span addition we use a separate WriterThread which reads
+ * a BlockingQueue of Spans and writes span data to disk.
+ */
+
+ private class DiskWriterThread implements Runnable {
+
+ /** Shared Span queue. Read-only for this thread. */
+ private BlockingQueue<Span> outstanding;
+
+ /** Shared queue of files currently in view. Read/write for this thread. */
+ private TreeMap<Long, File> files;
+
+ /** How many Spans already written to each file. */
+ private HashMap<File, Long> spansPerFile = new HashMap<File, Long>();
+
+ /** Total spans already written so far. */
+ private long spansSoFar;
+
+ /** DiskWriter for current file. */
+ private DataFileWriter<Span> currentWriter;
+
+ /** Timestamp of the current file. */
+ private long currentTimestamp = (long) 0;
+
+ /** Whether to buffer file writes.*/
+ private boolean doBuffer;
+
+ /** Compression level for files. */
+ private int compressionLevel;
+
+ /**
+ * Thread that runs continuously and writes outstanding requests to
+ * Avro files. This thread also deals with rolling files over and dropping
+ * old files when the span limit is reached.
+ * @param compressionLevel
+ */
+ public DiskWriterThread(BlockingQueue<Span> outstanding,
+ TreeMap<Long, File> files, boolean buffer,
+ int compressionLevel) {
+ this.outstanding = outstanding;
+ this.files = files;
+ this.doBuffer = buffer;
+ this.compressionLevel = compressionLevel;
+
+ /* TODO(pwendell) load existing files here.
+ *
+ * If we decide to re-factor TracePlugin such that only one exists, we
+ * can safely load old span files and include them here.*/
+ }
+
+ public void run() {
+ while (true) {
+ Span s = null;
+ try {
+ s = this.outstanding.take();
+ } catch (InterruptedException e1) {
+ LOG.warn("Thread interrupted");
+ Thread.currentThread().interrupt();
+ }
+ try {
+ assureCurrentWriter();
+ this.currentWriter.append(s);
+ if (!this.doBuffer) this.currentWriter.flush();
+ this.spansSoFar += 1;
+ File latest = this.files.lastEntry().getValue();
+ long fileSpans = this.spansPerFile.get(latest);
+ this.spansPerFile.put(latest, fileSpans + 1);
+ } catch (IOException e) {
+ LOG.warn("Error setting span file: " + e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * Assure that currentWriter is populated and refers to the correct
+ * data file. This may roll-over the existing data file. Also assures
+ * that writing one more span will not violate limits on Span storage.
+ * @throws IOException
+ */
+ private void assureCurrentWriter() throws IOException {
+ boolean createNewFile = false;
+
+ // Will we overshoot policy?
+ while (this.spansSoFar >= maxSpans) {
+ File oldest = null;
+ // If spansSoFar is positive, there must be at least one file
+ synchronized (this.files) {
+ oldest = this.files.remove(this.files.firstKey());
+ }
+ this.spansSoFar -= spansPerFile.get(oldest);
+ spansPerFile.remove(oldest);
+ oldest.delete();
+ }
+ if (files.size() == 0) {
+ // In corner case we have removed the current file,
+ // if that happened we need to clear current variables.
+ currentTimestamp = (long) 0;
+ currentWriter = null;
+ }
+ long rightNow = System.currentTimeMillis() / 1000L;
+
+ // What file should we be in
+ long cutOff = floorSecond(rightNow);
+
+ if (currentWriter == null) {
+ createNewFile = true;
+ }
+ // Test for roll-over.
+ else if (cutOff >= (currentTimestamp + secondsPerFile)) {
+ currentWriter.close();
+ createNewFile = true;
+ }
+
+ if (createNewFile) {
+ File newFile = new File(traceFileDir + "/" +
+ Thread.currentThread().getId() + "_" + cutOff + FILE_SUFFIX);
+ synchronized (this.files) {
+ this.files.put(cutOff, newFile);
+ }
+ this.spansPerFile.put(newFile, (long) 0);
+ this.currentWriter = new DataFileWriter<Span>(SPAN_WRITER);
+ this.currentWriter.setCodec(CodecFactory.deflateCodec(compressionLevel));
+ this.currentWriter.create(Span.SCHEMA$, newFile);
+ this.currentTimestamp = cutOff;
+ }
+ }
+ }
+ /** Directory of data files */
+ private final static String FILE_SUFFIX = ".av";
+
+ private final static SpecificDatumWriter<Span> SPAN_WRITER =
+ new SpecificDatumWriter<Span>(Span.class);
+ private final static SpecificDatumReader<Span> SPAN_READER =
+ new SpecificDatumReader<Span>(Span.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileSpanStorage.class);
+
+ private long maxSpans = DEFAULT_MAX_SPANS;
+
+ /** Granularity of file chunks. */
+ private int secondsPerFile = 60 * 10; // default: ten minute chunks
+
+ private String traceFileDir = "/tmp";
+
+ /** Shared queue of files currently in view. This thread only reads.*/
+ private TreeMap<Long, File> files = new TreeMap<Long, File>();
+
+ /** Shared Span queue. This thread only writes. */
+ LinkedBlockingQueue<Span> outstanding = new LinkedBlockingQueue<Span>();
+
+ /** DiskWriter thread */
+ private Thread writer;
+
+ /**
+ * Given a path to a data file of Spans, extract all spans and add them
+ * to the provided list.
+ */
+ private static void readFileSpans(File f, List<Span> list)
+ throws IOException {
+ DataFileReader<Span> reader = new DataFileReader<Span>(f, SPAN_READER);
+ Iterator<Span> it = reader.iterator();
+ ArrayList<Span> spans = new ArrayList<Span>();
+ while (it.hasNext()) {
+ spans.add(it.next());
+ }
+ list.addAll(spans);
+ }
+
+ /**
+ * Given a path to a data file of Spans, extract spans within a time period
+ * bounded by start and end.
+ */
+ private static void readFileSpans(File f, List<Span> list,
+ long start, long end) throws IOException {
+ DataFileReader<Span> reader = new DataFileReader<Span>(f, SPAN_READER);
+ Iterator<Span> it = reader.iterator();
+ ArrayList<Span> spans = new ArrayList<Span>();
+ while (it.hasNext()) {
+ Span s = it.next();
+ if (Util.spanInRange(s, start, end)) {
+ spans.add(s);
+ }
+ }
+ list.addAll(spans);
+ }
+
+ public FileSpanStorage(boolean buffer, TracePluginConfiguration conf) {
+ this.writer = new Thread(new DiskWriterThread(
+ outstanding, files, buffer, conf.compressionLevel));
+ this.secondsPerFile = conf.fileGranularitySeconds;
+ this.traceFileDir = conf.spanStorageDir;
+ this.writer.start();
+ }
+
+ /**
+ * Return the head of the time bucket associated with this specific time.
+ */
+ private long floorSecond(long currentSecond) {
+ return currentSecond - (currentSecond % this.secondsPerFile);
+ }
+
+ @Override
+ public void addSpan(Span s) {
+ this.outstanding.add(s);
+ }
+
+ @Override
+ public List<Span> getAllSpans() {
+ ArrayList<Span> out = new ArrayList<Span>();
+ synchronized (this.files) {
+ for (File f: this.files.values()) {
+ try {
+ readFileSpans(f, out);
+ } catch (IOException e) {
+ LOG.warn("Error reading file: " +
+ f.getAbsolutePath());
+ }
+ }
+ }
+ return out;
+ }
+
+ /**
+ * Clear all Span data stored by this plugin.
+ */
+ public void clear() {
+ synchronized (this.files) {
+ for (Long l: new LinkedList<Long>(this.files.keySet())) {
+ File f = this.files.remove(l);
+ f.delete();
+ }
+ }
+ }
+
+ @Override
+ public void setMaxSpans(long maxSpans) {
+ this.maxSpans = maxSpans;
+ }
+
+ @Override
+ public List<Span> getSpansInRange(long start, long end) {
+ /*
+ * We first find the book-end files (first and last) whose Spans may
+ * or may not fit in the the range. Intermediary files can be directly
+ * passed, since they are completely within the time range.
+ *
+ * [ ] <-- Time range
+ *
+ * |-----++|+++++++|+++++++|+++++++|+++----|-------|--->
+ * \ / \ /
+ * start end
+ * file file
+ *
+ */
+ List<Span> out = new ArrayList<Span>();
+ List<Long> middleFiles = new LinkedList<Long>();
+
+ long startSecond = start / SpanStorage.NANOS_PER_SECOND;
+ long endSecond = end / SpanStorage.NANOS_PER_SECOND;
+
+ int numFiles = (int) (endSecond - startSecond) / secondsPerFile;
+ for (int i = 1; i < (numFiles); i++) {
+ middleFiles.add(startSecond + i * secondsPerFile);
+ }
+
+ synchronized (this.files) {
+ for (Long l: middleFiles) {
+ if (files.containsKey(l)) {
+ try {
+ readFileSpans(files.get(l), out);
+ } catch (IOException e) {
+ LOG.warn("Error reading file: " + files.get(l).getAbsolutePath());
+ }
+ }
+ }
+
+ // Start file
+ if (files.containsKey(startSecond)) {
+ try {
+ readFileSpans(files.get(startSecond), out, start, end);
+ } catch (IOException e) {
+ LOG.warn("Error reading file: " +
+ files.get(startSecond).getAbsolutePath());
+ }
+ }
+
+ // End file
+ if (files.containsKey(endSecond)) {
+ try {
+ readFileSpans(files.get(endSecond), out, start, end);
+ } catch (IOException e) {
+ LOG.warn("Error reading file: " +
+ files.get(endSecond).getAbsolutePath());
+ }
+ }
+ }
+ return out;
+ }
+}
Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java?rev=985363&r1=985362&r2=985363&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java Fri Aug 13 20:58:13 2010
@@ -30,7 +30,6 @@ import java.util.List;
*/
public class InMemorySpanStorage implements SpanStorage {
private static final long DEFAULT_MAX_SPANS = 10000;
-
protected LinkedList<Span> spans;
private long maxSpans;
@@ -65,4 +64,15 @@ public class InMemorySpanStorage impleme
public List<Span> getAllSpans() {
return (LinkedList<Span>) this.spans.clone();
}
+
+ @Override
+ public List<Span> getSpansInRange(long start, long end) {
+ List<Span> out = new LinkedList<Span>();
+ for (Span s: this.spans) {
+ if (Util.spanInRange(s, start, end)) {
+ out.add(s);
+ }
+ }
+ return out;
+ }
}
Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java?rev=985363&r1=985362&r2=985363&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java Fri Aug 13 20:58:13 2010
@@ -28,6 +28,10 @@ import java.util.List;
*
*/
public interface SpanStorage {
+ long DEFAULT_MAX_SPANS = 10000L;
+ long MILLIS_PER_SECOND = 1000L;
+ long NANOS_PER_SECOND = 1000000000L;
+
/**
* Add a span.
* @param s
@@ -43,4 +47,11 @@ public interface SpanStorage {
* Return a list of all spans currently stored. For testing.
*/
List<Span> getAllSpans();
+
+ /**
+ * Return a list of all spans that fall within the time given range.
+ * @param start UNIX time (in nanoseconds) as a long
+ * @param end UNIX time (in nanoseconds) as a long
+ */
+ List<Span> getSpansInRange(long start, long end);
}
Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java?rev=985363&r1=985362&r2=985363&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java Fri Aug 13 20:58:13 2010
@@ -84,12 +84,23 @@ public class TracePlugin extends RPCPlug
public GenericArray<Span> getAllSpans() throws AvroRemoteException {
List<Span> spans = this.spanStorage.getAllSpans();
GenericData.Array<Span> out;
- synchronized (spans) {
- out = new GenericData.Array<Span>(spans.size(),
- Schema.createArray(Span.SCHEMA$));
- for (Span s: spans) {
- out.add(s);
- }
+ out = new GenericData.Array<Span>(spans.size(),
+ Schema.createArray(Span.SCHEMA$));
+ for (Span s: spans) {
+ out.add(s);
+ }
+ return out;
+ }
+
+ @Override
+ public GenericArray<Span> getSpansInRange(long start, long end)
+ throws AvroRemoteException {
+ List<Span> spans = this.spanStorage.getSpansInRange(start, end);
+ GenericData.Array<Span> out;
+ out = new GenericData.Array<Span>(spans.size(),
+ Schema.createArray(Span.SCHEMA$));
+ for (Span s: spans) {
+ out.add(s);
}
return out;
}
@@ -139,9 +150,12 @@ public class TracePlugin extends RPCPlug
}
};
- if (storageType.equals("MEMORY")) {
+ if (storageType == StorageType.MEMORY) {
this.storage = new InMemorySpanStorage();
}
+ else if (storageType == StorageType.DISK) {
+ this.storage = new FileSpanStorage(false, conf);
+ }
else { // default
this.storage = new InMemorySpanStorage();
}
@@ -288,6 +302,14 @@ public class TracePlugin extends RPCPlug
}
}
+ public void stopClientServer() {
+ try {
+ this.clientFacingServer.stop();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+
/**
* Start a client-facing server. Can be overridden if users
* prefer to attach client Servlet to their own server.
Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java?rev=985363&r1=985362&r2=985363&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java Fri Aug 13 20:58:13 2010
@@ -30,6 +30,12 @@ public class TracePluginConfiguration {
public StorageType storageType; // How to store spans
public long maxSpans; // Max number of spans to store
public boolean enabled; // Whether or not we are active
+ public boolean buffer; // If disk storage, whether to buffer writes
+ public int compressionLevel; // If using file storage, what compression
+ // level (0-9).
+ public int fileGranularitySeconds; // How many seconds of span data to store
+ // in each file.
+ public String spanStorageDir; // where to store span data, if file-based
/**
* Return a TracePluginConfiguration with default options.
@@ -41,5 +47,9 @@ public class TracePluginConfiguration {
this.storageType = StorageType.MEMORY;
this.maxSpans = 10000;
this.enabled = true;
+ this.buffer = true;
+ this.compressionLevel = 9;
+ this.fileGranularitySeconds = 500;
+ this.spanStorageDir = "/tmp";
}
}
Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java?rev=985363&r1=985362&r2=985363&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java Fri Aug 13 20:58:13 2010
@@ -53,7 +53,6 @@ class Util {
return foundEvents;
}
-
/**
* Get the size of an RPC payload.
*/
@@ -166,4 +165,25 @@ class Util {
return Arrays.equals(a.bytes(), b.bytes());
}
+ /**
+ * Tests if a span occurred between start and end.
+ */
+ public static boolean spanInRange(Span s, long start, long end) {
+ long startTime = 0;
+ long endTime = 0;
+
+ for (TimestampedEvent e: s.events) {
+ if (e.event instanceof SpanEvent) {
+ SpanEvent ev = (SpanEvent) e.event;
+ switch (ev) {
+ case CLIENT_SEND: startTime = e.timeStamp;
+ case SERVER_RECV: startTime = e.timeStamp;
+ case CLIENT_RECV: endTime = e.timeStamp;
+ case SERVER_SEND: endTime = e.timeStamp;
+ }
+ }
+ }
+ if (startTime > start && endTime < end) { return true; }
+ return false;
+ }
}
Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java?rev=985363&r1=985362&r2=985363&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java Fri Aug 13 20:58:13 2010
@@ -37,6 +37,7 @@ import org.apache.avro.ipc.RPCPlugin;
import org.apache.avro.ipc.Responder;
import org.apache.avro.ipc.trace.SpanAggregator.SpanAggregationResults;
import org.apache.avro.ipc.trace.SpanAggregator.TraceFormationResults;
+import org.apache.avro.ipc.trace.TracePlugin.StorageType;
import org.junit.Test;
/**
@@ -137,10 +138,23 @@ public class TestEndToEndTracing {
return currentCount + 1;
}
}
-
+
+ @Test
+ public void testTraceAndCollectionMemory() throws Exception {
+ TracePluginConfiguration conf = new TracePluginConfiguration();
+ conf.storageType = StorageType.MEMORY;
+ testTraceAndCollection(conf);
+ }
+
@Test
- public void testTraceAndCollection() throws Exception {
+ public void testTraceAndCollectionDisk() throws Exception {
TracePluginConfiguration conf = new TracePluginConfiguration();
+ conf.storageType = StorageType.DISK;
+ conf.buffer = false;
+ testTraceAndCollection(conf);
+ }
+
+ public void testTraceAndCollection(TracePluginConfiguration conf) throws Exception {
conf.traceProb = 1.0;
conf.port = 51010;
conf.clientPort = 12346;
@@ -182,7 +196,7 @@ public class TestEndToEndTracing {
advancedProtocol.getMessages().get("w").getRequest());
params.put("req", 1);
r.request("w", params);
-
+ Thread.sleep(1000);
ArrayList<Span> allSpans = new ArrayList<Span>();
allSpans.addAll(aPlugin.storage.getAllSpans());
Added: avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestFileSpanStorage.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestFileSpanStorage.java?rev=985363&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestFileSpanStorage.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestFileSpanStorage.java Fri Aug 13 20:58:13 2010
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc.trace;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.avro.util.Utf8;
+import org.junit.Test;
+
+/**
+ * Unit tests for { @link FileSpanStorage }.
+ */
+public class TestFileSpanStorage {
+
+ @Test
+ public void testBasicStorage() {
+ TracePluginConfiguration conf = new TracePluginConfiguration();
+ FileSpanStorage test = new FileSpanStorage(false, conf);
+ Span s = Util.createEventlessSpan(Util.idValue(1), Util.idValue(1), null);
+ s.messageName = new Utf8("message");
+ test.addSpan(s);
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ assertTrue(test.getAllSpans().contains(s));
+ test.clear();
+ }
+
+ @Test
+ public void testTonsOfSpans() {
+ TracePluginConfiguration conf = new TracePluginConfiguration();
+ FileSpanStorage test = new FileSpanStorage(false, conf);
+ test.setMaxSpans(100000);
+ List<Span> spans = new ArrayList<Span>(50000);
+ for (int i = 0; i < 50000; i++) {
+ Span s = Util.createEventlessSpan(Util.idValue(i), Util.idValue(i), null);
+ s.messageName = new Utf8("message");
+ test.addSpan(s);
+ spans.add(s);
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ assertEquals(50000, test.getAllSpans().size());
+
+ // Test fewer spans but explicitly call containsAll
+ TracePluginConfiguration conf2 = new TracePluginConfiguration();
+ FileSpanStorage test2 = new FileSpanStorage(false, conf2);
+ test.setMaxSpans(100000);
+ spans.clear();
+ for (int i = 0; i < 5000; i++) {
+ Span s = Util.createEventlessSpan(Util.idValue(i), Util.idValue(i), null);
+ s.messageName = new Utf8("message");
+ test2.addSpan(s);
+ spans.add(s);
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ assertTrue(test.getAllSpans().containsAll(spans));
+ test.clear();
+ test2.clear();
+ }
+
+ @Test
+ public void testBasicMaxSpans() {
+ TracePluginConfiguration conf = new TracePluginConfiguration();
+ FileSpanStorage test = new FileSpanStorage(false, conf);
+ test.setMaxSpans(10);
+
+ // Add a bunch of spans
+ for (int i = 0; i < 100; i++) {
+ Span s = Util.createEventlessSpan(Util.idValue(i), Util.idValue(i), null);
+ s.messageName = new Utf8("message");
+ test.addSpan(s);
+ }
+
+ List<Span> lastNine = new LinkedList<Span>();
+ for (int i = 0; i < 9; i++) {
+ Span s = Util.createEventlessSpan(Util.idValue(100 + i), Util.idValue(100 + i), null);
+ s.messageName = new Utf8("message");
+ lastNine.add(s);
+ test.addSpan(s);
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ List<Span> retreived = test.getAllSpans();
+ assertEquals(9, retreived.size());
+ assertTrue(retreived.containsAll(lastNine));
+
+ test.clear();
+ }
+
+ @Test
+ public void testRangeQuery1() {
+ TracePluginConfiguration conf = new TracePluginConfiguration();
+ conf.fileGranularitySeconds = 1;
+ FileSpanStorage test = new FileSpanStorage(false, conf);
+ test.setMaxSpans(10000);
+
+ long cutOff1 = 0;
+ long cutOff2 = 0;
+
+ int numSpans = 10;
+
+ Span[] spans = new Span[numSpans];
+ // Add some spans
+ for (int i = 0; i < numSpans; i++) {
+ if (i == 1) { cutOff1 = (System.currentTimeMillis() - 20) * 1000000; }
+
+ Span s = Util.createEventlessSpan(Util.idValue(i), Util.idValue(i), null);
+ TimestampedEvent te1 = new TimestampedEvent();
+ te1.timeStamp = System.currentTimeMillis() * 1000000;
+ te1.event = SpanEvent.CLIENT_SEND;
+
+ TimestampedEvent te2 = new TimestampedEvent();
+ te2.timeStamp = System.currentTimeMillis() * 1000000;
+ te2.event = SpanEvent.CLIENT_RECV;
+ s.events.add(te1);
+ s.events.add(te2);
+
+ s.messageName = new Utf8("message");
+ test.addSpan(s);
+ spans[i] = s;
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+
+ }
+ if (i == numSpans - 2) {
+ cutOff2 = (System.currentTimeMillis() - 20) * 1000000;
+ }
+ }
+
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+
+ List<Span> retrieved = test.getSpansInRange(cutOff1, cutOff2);
+ assertEquals(numSpans - 2, retrieved.size());
+
+ assertFalse(retrieved.contains(spans[0]));
+ for (int j=1; j < numSpans - 2; j++) {
+ assertTrue(retrieved.contains(spans[j]));
+ }
+ assertFalse(retrieved.contains((spans[spans.length - 1])));
+
+ test.clear();
+ }
+}
Modified: avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avdl
URL: http://svn.apache.org/viewvc/avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avdl?rev=985363&r1=985362&r2=985363&view=diff
==============================================================================
--- avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avdl (original)
+++ avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avdl Fri Aug 13 20:58:13 2010
@@ -33,5 +33,14 @@ protocol AvroTrace {
boolean complete; // Whether includes data from both sides
}
+ /**
+ * Get all spans stored on this host.
+ */
array<Span> getAllSpans();
+
+ /**
+ * Get spans occuring between start and end. Each is a unix timestamp
+ * in nanosecond units (for consistency with TimestampedEvent).
+ */
+ array<Span> getSpansInRange(long start, long end);
}
Modified: avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avpr
URL: http://svn.apache.org/viewvc/avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avpr?rev=985363&r1=985362&r2=985363&view=diff
==============================================================================
--- avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avpr (original)
+++ avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avpr Fri Aug 13 20:58:13 2010
@@ -64,6 +64,19 @@
"type" : "array",
"items" : "Span"
}
+ },
+ "getSpansInRange" : {
+ "request" : [ {
+ "name" : "start",
+ "type" : "long"
+ }, {
+ "name" : "end",
+ "type" : "long"
+ } ],
+ "response" : {
+ "type" : "array",
+ "items" : "Span"
+ }
}
}
-}
\ No newline at end of file
+}