You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ph...@apache.org on 2010/08/13 22:58:14 UTC

svn commit: r985363 - in /avro/trunk: ./ lang/java/src/java/org/apache/avro/ipc/trace/ lang/java/src/test/java/org/apache/avro/ipc/trace/ share/schemas/org/apache/avro/ipc/trace/

Author: philz
Date: Fri Aug 13 20:58:13 2010
New Revision: 985363

URL: http://svn.apache.org/viewvc?rev=985363&view=rev
Log:
AVRO-606. Add File-Based Span Storage to TracePlugin (Contributed by Patrick Wendell)


Added:
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/FileSpanStorage.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestFileSpanStorage.java
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java
    avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avdl
    avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avpr

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=985363&r1=985362&r2=985363&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Aug 13 20:58:13 2010
@@ -21,6 +21,9 @@ Avro 1.4.0 (unreleased)
 
   NEW FEATURES
 
+    AVRO-606. Add File-Based Span Storage to TracePlugin
+    (Patrick Wendell via philz)
+
     AVRO-595. Add Basic Trace Collection and Propagation.
     (Patrick Wendell via philz)
 

Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/FileSpanStorage.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/FileSpanStorage.java?rev=985363&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/FileSpanStorage.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/FileSpanStorage.java Fri Aug 13 20:58:13 2010
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc.trace;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A file-based { @link SpanStorage } implementation for Avro's 
+ * { @link TracePlugin }. This class has two modes, one in which writes are 
+ * buffered and one in which they are not. Even without buffering, there will be
+ * some delay between reporting of a Span and the actual disk write.
+ */
+public class FileSpanStorage implements SpanStorage {
+  /*
+   * We use rolling Avro data files that store Span data associated with ten 
+   * minute chunks (this provides a simple way to index on time). Because we 
+   * enforce an upper limit on the number of spans stored, we simply drop 
+   * oldest file if and when the next write causes us to exceed that limit. This
+   * approximates a FIFO queue of spans, which is basically what we want to 
+   * maintain.
+   * 
+   * Focus is on efficiency since most logic occurs every
+   * time a span is recorded (that is, every RPC call).
+   * 
+   * We never want to block on span adding operations, which occur in the same
+   * thread as the Requestor. We are okay to block on span retrieving 
+   * operations, since they typically run the RPCPlugin's own servlet. To
+   * avoid blocking on span addition we use a separate WriterThread which reads
+   * a BlockingQueue of Spans and writes span data to disk.  
+   */
+  
+  private class DiskWriterThread implements Runnable {
+
+    /** Shared Span queue. Read-only for this thread. */
+    private BlockingQueue<Span> outstanding;
+    
+    /** Shared queue of files currently in view. Read/write for this thread. */
+    private TreeMap<Long, File> files;
+    
+    /** How many Spans already written to each file. */
+    private HashMap<File, Long> spansPerFile = new HashMap<File, Long>();  
+    
+    /** Total spans already written so far. */
+    private long spansSoFar;
+        
+    /** DiskWriter for current file. */
+    private DataFileWriter<Span> currentWriter;
+    
+    /** Timestamp of the current file. */
+    private long currentTimestamp = (long) 0;
+    
+    /** Whether to buffer file writes.*/
+    private boolean doBuffer;
+    
+    /** Compression level for files. */
+    private int compressionLevel;
+    
+    /**
+     * Thread that runs continuously and writes outstanding requests to
+     * Avro files. This thread also deals with rolling files over and dropping
+     * old files when the span limit is reached.
+     * @param compressionLevel 
+     */
+    public DiskWriterThread(BlockingQueue<Span> outstanding,  
+        TreeMap<Long, File> files, boolean buffer, 
+        int compressionLevel) {
+      this.outstanding = outstanding;
+      this.files = files;
+      this.doBuffer = buffer;
+      this.compressionLevel = compressionLevel; 
+      
+      /* TODO(pwendell) load existing files here.
+       * 
+       * If we decide to re-factor TracePlugin such that only one exists, we 
+       * can safely load old span files and include them here.*/ 
+    }
+    
+    public void run() {
+      while (true) {
+        Span s = null;
+        try {
+          s = this.outstanding.take();
+        } catch (InterruptedException e1) {
+          LOG.warn("Thread interrupted");
+          Thread.currentThread().interrupt();
+        }
+        try {
+          assureCurrentWriter();
+          this.currentWriter.append(s);
+          if (!this.doBuffer) this.currentWriter.flush();
+          this.spansSoFar += 1;
+          File latest = this.files.lastEntry().getValue();
+          long fileSpans = this.spansPerFile.get(latest);
+          this.spansPerFile.put(latest, fileSpans + 1);
+        } catch (IOException e) {
+          LOG.warn("Error setting span file: " + e.getMessage());
+        }
+      }
+    }
+    
+    /**
+     * Assure that currentWriter is populated and refers to the correct
+     * data file. This may roll-over the existing data file. Also assures
+     * that writing one more span will not violate limits on Span storage.
+     * @throws IOException 
+     */
+    private void assureCurrentWriter() throws IOException {
+      boolean createNewFile = false;
+      
+      // Will we overshoot policy?
+      while (this.spansSoFar >= maxSpans) {
+        File oldest = null;
+        // If spansSoFar is positive, there must be at least one file
+        synchronized (this.files) {
+          oldest = this.files.remove(this.files.firstKey());
+        }
+        this.spansSoFar -= spansPerFile.get(oldest);
+        spansPerFile.remove(oldest);
+        oldest.delete();
+      }
+      if (files.size() == 0) { 
+        // In corner case we have removed the current file,
+        // if that happened we need to clear current variables.
+        currentTimestamp = (long) 0;
+        currentWriter = null;
+      }
+      long rightNow = System.currentTimeMillis() / 1000L;
+      
+      // What file should we be in
+      long cutOff = floorSecond(rightNow);
+      
+      if (currentWriter == null) {
+        createNewFile = true;
+      }
+      // Test for roll-over.
+      else if (cutOff >= (currentTimestamp + secondsPerFile)) {
+        currentWriter.close();
+        createNewFile = true;
+      }
+      
+      if (createNewFile) {
+        File newFile = new File(traceFileDir + "/" + 
+            Thread.currentThread().getId() + "_" + cutOff + FILE_SUFFIX);
+        synchronized (this.files) {
+          this.files.put(cutOff, newFile);
+        }
+        this.spansPerFile.put(newFile, (long) 0);
+        this.currentWriter = new DataFileWriter<Span>(SPAN_WRITER);
+        this.currentWriter.setCodec(CodecFactory.deflateCodec(compressionLevel));
+        this.currentWriter.create(Span.SCHEMA$, newFile);
+        this.currentTimestamp = cutOff;
+      }
+    }
+  }
+  /** Directory of data files */
+  private final static String FILE_SUFFIX = ".av";
+
+  private final static SpecificDatumWriter<Span> SPAN_WRITER = 
+    new SpecificDatumWriter<Span>(Span.class);
+  private final static SpecificDatumReader<Span> SPAN_READER = 
+    new SpecificDatumReader<Span>(Span.class);
+  
+  private static final Logger LOG = LoggerFactory.getLogger(FileSpanStorage.class);
+  
+  private long maxSpans = DEFAULT_MAX_SPANS;
+  
+  /** Granularity of file chunks. */
+  private int secondsPerFile = 60 * 10; // default: ten minute chunks
+  
+  private String traceFileDir = "/tmp";
+  
+  /** Shared queue of files currently in view. This thread only reads.*/
+  private TreeMap<Long, File> files = new TreeMap<Long, File>();
+  
+  /** Shared Span queue. This thread only writes. */
+  LinkedBlockingQueue<Span> outstanding = new LinkedBlockingQueue<Span>();
+  
+  /** DiskWriter thread */
+  private Thread writer;
+    
+  /**
+   * Given a path to a data file of Spans, extract all spans and add them
+   * to the provided list.
+   */
+  private static void readFileSpans(File f, List<Span> list)
+    throws IOException {
+    DataFileReader<Span> reader = new DataFileReader<Span>(f, SPAN_READER);
+    Iterator<Span> it = reader.iterator();
+    ArrayList<Span> spans = new ArrayList<Span>();
+    while (it.hasNext()) {
+      spans.add(it.next());
+    }
+    list.addAll(spans);
+  }
+  
+  /**
+   * Given a path to a data file of Spans, extract spans within a time period
+   * bounded by start and end.
+   */
+  private static void readFileSpans(File f, List<Span> list, 
+      long start, long end) throws IOException {
+    DataFileReader<Span> reader = new DataFileReader<Span>(f, SPAN_READER);
+    Iterator<Span> it = reader.iterator();
+    ArrayList<Span> spans = new ArrayList<Span>();
+    while (it.hasNext()) {
+      Span s = it.next();
+      if (Util.spanInRange(s, start, end)) {
+        spans.add(s);
+      }
+    }
+    list.addAll(spans);
+  }
+  
+  public FileSpanStorage(boolean buffer, TracePluginConfiguration conf) {
+    this.writer = new Thread(new DiskWriterThread(
+        outstanding, files, buffer, conf.compressionLevel));
+    this.secondsPerFile = conf.fileGranularitySeconds;
+    this.traceFileDir = conf.spanStorageDir;
+    this.writer.start();
+  }
+  
+  /**
+   * Return the head of the time bucket associated with this specific time.
+   */
+  private long floorSecond(long currentSecond) {
+    return currentSecond - (currentSecond % this.secondsPerFile);
+  }
+  
+  @Override
+  public void addSpan(Span s) {
+    this.outstanding.add(s);
+  }
+
+  @Override
+  public List<Span> getAllSpans() {
+    ArrayList<Span> out = new ArrayList<Span>();
+    synchronized (this.files) { 
+      for (File f: this.files.values()) {
+        try {
+          readFileSpans(f, out);
+        } catch (IOException e) {
+          LOG.warn("Error reading file: " + 
+              f.getAbsolutePath());
+        }
+      }
+    }
+    return out;
+  }
+  
+  /**
+   * Clear all Span data stored by this plugin.
+   */
+  public void clear() {
+    synchronized (this.files) { 
+      for (Long l: new LinkedList<Long>(this.files.keySet())) {
+        File f = this.files.remove(l);
+        f.delete();
+      }
+    }
+  }
+
+  @Override
+  public void setMaxSpans(long maxSpans) {
+    this.maxSpans = maxSpans;
+  }
+
+  @Override
+  public List<Span> getSpansInRange(long start, long end) {
+    /*
+     * We first find the book-end files (first and last) whose Spans may 
+     * or may not fit in the the range. Intermediary files can be directly 
+     * passed, since they are completely within the time range.
+     * 
+     *       [                            ]   <-- Time range
+     *       
+     * |-----++|+++++++|+++++++|+++++++|+++----|-------|--->
+     *  \     /                         \     /
+     *   start                            end
+     *   file                             file
+     * 
+     */ 
+    List<Span> out = new ArrayList<Span>();
+    List<Long> middleFiles = new LinkedList<Long>();
+    
+    long startSecond = start / SpanStorage.NANOS_PER_SECOND;
+    long endSecond = end / SpanStorage.NANOS_PER_SECOND;
+    
+    int numFiles = (int) (endSecond - startSecond) / secondsPerFile;
+    for (int i = 1; i < (numFiles); i++) {
+      middleFiles.add(startSecond + i * secondsPerFile);
+    }
+    
+    synchronized (this.files) { 
+      for (Long l: middleFiles) {
+        if (files.containsKey(l)) {
+          try {
+            readFileSpans(files.get(l), out);
+          } catch (IOException e) {
+            LOG.warn("Error reading file: " + files.get(l).getAbsolutePath());
+          }
+        }
+      }
+      
+      // Start file
+      if (files.containsKey(startSecond)) {
+        try {
+          readFileSpans(files.get(startSecond), out, start, end);
+        } catch (IOException e) {
+          LOG.warn("Error reading file: " + 
+              files.get(startSecond).getAbsolutePath());
+        }
+      }
+      
+      // End file
+      if (files.containsKey(endSecond)) {
+        try {
+          readFileSpans(files.get(endSecond), out, start, end);
+        } catch (IOException e) {
+          LOG.warn("Error reading file: " + 
+              files.get(endSecond).getAbsolutePath());
+        }
+      }
+    }
+    return out;
+  }
+}

Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java?rev=985363&r1=985362&r2=985363&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java Fri Aug 13 20:58:13 2010
@@ -30,7 +30,6 @@ import java.util.List;
  */
 public class InMemorySpanStorage implements SpanStorage {
   private static final long DEFAULT_MAX_SPANS = 10000;
-  
   protected LinkedList<Span> spans;
   private long maxSpans;
 
@@ -65,4 +64,15 @@ public class InMemorySpanStorage impleme
   public List<Span> getAllSpans() {
     return (LinkedList<Span>) this.spans.clone();
   }
+
+  @Override
+  public List<Span> getSpansInRange(long start, long end) {
+    List<Span> out = new LinkedList<Span>();
+    for (Span s: this.spans) {
+      if (Util.spanInRange(s, start, end)) {
+        out.add(s);
+      }
+    }
+    return out;
+  }
 }

Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java?rev=985363&r1=985362&r2=985363&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java Fri Aug 13 20:58:13 2010
@@ -28,6 +28,10 @@ import java.util.List;
  *
  */
 public interface SpanStorage {
+  long DEFAULT_MAX_SPANS = 10000L;
+  long MILLIS_PER_SECOND = 1000L;
+  long NANOS_PER_SECOND = 1000000000L;
+  
   /**
    * Add a span. 
    * @param s
@@ -43,4 +47,11 @@ public interface SpanStorage {
    * Return a list of all spans currently stored. For testing.
    */
   List<Span> getAllSpans();
+
+  /**
+   * Return a list of all spans that fall within the time given range. 
+   * @param start UNIX time (in nanoseconds) as a long
+   * @param end UNIX time (in nanoseconds) as a long
+   */
+  List<Span> getSpansInRange(long start, long end);
 }

Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java?rev=985363&r1=985362&r2=985363&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java Fri Aug 13 20:58:13 2010
@@ -84,12 +84,23 @@ public class TracePlugin extends RPCPlug
     public GenericArray<Span> getAllSpans() throws AvroRemoteException {
       List<Span> spans = this.spanStorage.getAllSpans();
       GenericData.Array<Span> out;
-      synchronized (spans) { 
-        out = new GenericData.Array<Span>(spans.size(), 
-          Schema.createArray(Span.SCHEMA$));
-        for (Span s: spans) {
-          out.add(s);
-        }
+      out = new GenericData.Array<Span>(spans.size(), 
+        Schema.createArray(Span.SCHEMA$));
+      for (Span s: spans) {
+        out.add(s);
+      }
+      return out;
+    }
+
+    @Override
+    public GenericArray<Span> getSpansInRange(long start, long end)
+        throws AvroRemoteException {
+      List<Span> spans = this.spanStorage.getSpansInRange(start, end);
+      GenericData.Array<Span> out;
+      out = new GenericData.Array<Span>(spans.size(), 
+        Schema.createArray(Span.SCHEMA$));
+      for (Span s: spans) {
+        out.add(s);
       }
       return out;
     }
@@ -139,9 +150,12 @@ public class TracePlugin extends RPCPlug
       }
     };
 
-    if (storageType.equals("MEMORY")) {
+    if (storageType == StorageType.MEMORY) {
       this.storage = new InMemorySpanStorage();
     }
+    else if (storageType == StorageType.DISK) {
+      this.storage = new FileSpanStorage(false, conf);
+    }
     else { // default
       this.storage = new InMemorySpanStorage();
     }
@@ -288,6 +302,14 @@ public class TracePlugin extends RPCPlug
     }
   }
   
+  public void stopClientServer() {
+    try {
+      this.clientFacingServer.stop();
+    } catch (Exception e) {
+      // ignore
+    }
+  }
+  
   /**
    * Start a client-facing server. Can be overridden if users
    * prefer to attach client Servlet to their own server. 

Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java?rev=985363&r1=985362&r2=985363&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java Fri Aug 13 20:58:13 2010
@@ -30,6 +30,12 @@ public class TracePluginConfiguration {
   public StorageType storageType;  // How to store spans
   public long maxSpans;   // Max number of spans to store
   public boolean enabled; // Whether or not we are active
+  public boolean buffer;  // If disk storage, whether to buffer writes
+  public int compressionLevel; // If using file storage, what compression
+                               // level (0-9).
+  public int fileGranularitySeconds; // How many seconds of span data to store
+                                     // in each file.
+  public String spanStorageDir; // where to store span data, if file-based
   
   /**
    * Return a TracePluginConfiguration with default options.
@@ -41,5 +47,9 @@ public class TracePluginConfiguration {
     this.storageType = StorageType.MEMORY;
     this.maxSpans = 10000;
     this.enabled = true;
+    this.buffer = true;
+    this.compressionLevel = 9;
+    this.fileGranularitySeconds = 500;
+    this.spanStorageDir = "/tmp";
   }
 }

Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java?rev=985363&r1=985362&r2=985363&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java Fri Aug 13 20:58:13 2010
@@ -53,7 +53,6 @@ class Util {
     return foundEvents;
   }
   
-  
   /**
    * Get the size of an RPC payload.
    */
@@ -166,4 +165,25 @@ class Util {
     return Arrays.equals(a.bytes(), b.bytes());
   }
   
+  /**
+   * Tests if a span occurred between start and end.
+   */
+  public static boolean spanInRange(Span s, long start, long end) {
+    long startTime = 0;
+    long endTime = 0;
+    
+    for (TimestampedEvent e: s.events) {
+      if (e.event instanceof SpanEvent) {
+        SpanEvent ev = (SpanEvent) e.event;
+        switch (ev) {
+          case CLIENT_SEND: startTime = e.timeStamp;
+          case SERVER_RECV: startTime = e.timeStamp;
+          case CLIENT_RECV: endTime = e.timeStamp;
+          case SERVER_SEND: endTime = e.timeStamp;
+        }      
+      }
+    }
+    if (startTime > start && endTime < end) { return true; }
+    return false;
+  }
 }

Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java?rev=985363&r1=985362&r2=985363&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java Fri Aug 13 20:58:13 2010
@@ -37,6 +37,7 @@ import org.apache.avro.ipc.RPCPlugin;
 import org.apache.avro.ipc.Responder;
 import org.apache.avro.ipc.trace.SpanAggregator.SpanAggregationResults;
 import org.apache.avro.ipc.trace.SpanAggregator.TraceFormationResults;
+import org.apache.avro.ipc.trace.TracePlugin.StorageType;
 import org.junit.Test;
 
 /**
@@ -137,10 +138,23 @@ public class TestEndToEndTracing {
       return currentCount + 1;
     }
   }
-
+  
+  @Test
+  public void testTraceAndCollectionMemory() throws Exception {
+    TracePluginConfiguration conf = new TracePluginConfiguration();
+    conf.storageType = StorageType.MEMORY;
+    testTraceAndCollection(conf);
+  }  
+  
   @Test
-  public void testTraceAndCollection() throws Exception {
+  public void testTraceAndCollectionDisk() throws Exception {
     TracePluginConfiguration conf = new TracePluginConfiguration();
+    conf.storageType = StorageType.DISK;
+    conf.buffer = false;
+    testTraceAndCollection(conf);
+  }
+  
+  public void testTraceAndCollection(TracePluginConfiguration conf) throws Exception {
     conf.traceProb = 1.0;
     conf.port = 51010;
     conf.clientPort = 12346;
@@ -182,7 +196,7 @@ public class TestEndToEndTracing {
         advancedProtocol.getMessages().get("w").getRequest());
     params.put("req", 1);
     r.request("w", params);
-    
+    Thread.sleep(1000);
     ArrayList<Span> allSpans = new ArrayList<Span>();
     
     allSpans.addAll(aPlugin.storage.getAllSpans());

Added: avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestFileSpanStorage.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestFileSpanStorage.java?rev=985363&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestFileSpanStorage.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestFileSpanStorage.java Fri Aug 13 20:58:13 2010
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc.trace;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.avro.util.Utf8;
+import org.junit.Test;
+
+/**
+ * Unit tests for { @link FileSpanStorage }.
+ */
+public class TestFileSpanStorage {
+  
+  @Test
+  public void testBasicStorage() {
+    TracePluginConfiguration conf = new TracePluginConfiguration();
+    FileSpanStorage test = new FileSpanStorage(false, conf);
+    Span s = Util.createEventlessSpan(Util.idValue(1), Util.idValue(1), null);
+    s.messageName = new Utf8("message");
+    test.addSpan(s);
+    try {
+      Thread.sleep(100);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    assertTrue(test.getAllSpans().contains(s));
+    test.clear();
+  }
+  
+  @Test
+  public void testTonsOfSpans() {
+    TracePluginConfiguration conf = new TracePluginConfiguration();
+    FileSpanStorage test = new FileSpanStorage(false, conf);
+    test.setMaxSpans(100000);
+    List<Span> spans = new ArrayList<Span>(50000);
+    for (int i = 0; i < 50000; i++) {
+      Span s = Util.createEventlessSpan(Util.idValue(i), Util.idValue(i), null);
+      s.messageName = new Utf8("message");
+      test.addSpan(s);
+      spans.add(s);
+    }
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    assertEquals(50000, test.getAllSpans().size());
+
+    // Test fewer spans but explicitly call containsAll
+    TracePluginConfiguration conf2 = new TracePluginConfiguration();
+    FileSpanStorage test2 = new FileSpanStorage(false, conf2);
+    test.setMaxSpans(100000);
+    spans.clear();
+    for (int i = 0; i < 5000; i++) {
+      Span s = Util.createEventlessSpan(Util.idValue(i), Util.idValue(i), null);
+      s.messageName = new Utf8("message");
+      test2.addSpan(s);
+      spans.add(s);
+    }
+    try {
+      Thread.sleep(100);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    assertTrue(test.getAllSpans().containsAll(spans));
+    test.clear();
+    test2.clear();
+  }
+  
+  @Test
+  public void testBasicMaxSpans() {
+    TracePluginConfiguration conf = new TracePluginConfiguration();
+    FileSpanStorage test = new FileSpanStorage(false, conf);
+    test.setMaxSpans(10);
+    
+    // Add a bunch of spans
+    for (int i = 0; i < 100; i++) {
+      Span s = Util.createEventlessSpan(Util.idValue(i), Util.idValue(i), null);
+      s.messageName = new Utf8("message");
+      test.addSpan(s);
+    }
+    
+    List<Span> lastNine = new LinkedList<Span>();
+    for (int i = 0; i < 9; i++) {
+      Span s = Util.createEventlessSpan(Util.idValue(100 + i), Util.idValue(100 + i), null);
+      s.messageName = new Utf8("message");
+      lastNine.add(s);
+      test.addSpan(s);
+    }
+    try {
+      Thread.sleep(100);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    List<Span> retreived = test.getAllSpans();
+    assertEquals(9, retreived.size());
+    assertTrue(retreived.containsAll(lastNine));
+    
+    test.clear();
+  }
+  
+  @Test
+  public void testRangeQuery1() {
+    TracePluginConfiguration conf = new TracePluginConfiguration();
+    conf.fileGranularitySeconds = 1;
+    FileSpanStorage test = new FileSpanStorage(false, conf);
+    test.setMaxSpans(10000);
+    
+    long cutOff1 = 0;
+    long cutOff2 = 0;
+    
+    int numSpans = 10;
+    
+    Span[] spans = new Span[numSpans];
+    // Add some spans
+    for (int i = 0; i < numSpans; i++) {
+      if (i == 1) { cutOff1 = (System.currentTimeMillis() - 20) * 1000000; }
+      
+      Span s = Util.createEventlessSpan(Util.idValue(i), Util.idValue(i), null);
+      TimestampedEvent te1 = new TimestampedEvent();
+      te1.timeStamp = System.currentTimeMillis() * 1000000;
+      te1.event = SpanEvent.CLIENT_SEND;
+      
+      TimestampedEvent te2 = new TimestampedEvent();
+      te2.timeStamp = System.currentTimeMillis() * 1000000;
+      te2.event = SpanEvent.CLIENT_RECV;
+      s.events.add(te1);
+      s.events.add(te2);
+      
+      s.messageName = new Utf8("message");
+      test.addSpan(s);
+      spans[i] = s;
+      
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+      
+      }
+      if (i == numSpans - 2) {
+        cutOff2 = (System.currentTimeMillis() - 20) * 1000000; 
+      }
+    }
+    
+    try {
+      Thread.sleep(100);
+    } catch (InterruptedException e) {
+    }
+    
+    List<Span> retrieved = test.getSpansInRange(cutOff1, cutOff2);
+    assertEquals(numSpans - 2, retrieved.size());
+    
+    assertFalse(retrieved.contains(spans[0]));
+    for (int j=1; j < numSpans - 2; j++) {
+      assertTrue(retrieved.contains(spans[j]));
+    }
+    assertFalse(retrieved.contains((spans[spans.length - 1])));
+    
+    test.clear();
+  }
+}

Modified: avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avdl
URL: http://svn.apache.org/viewvc/avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avdl?rev=985363&r1=985362&r2=985363&view=diff
==============================================================================
--- avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avdl (original)
+++ avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avdl Fri Aug 13 20:58:13 2010
@@ -33,5 +33,14 @@ protocol AvroTrace {
     boolean complete; // Whether includes data from both sides
   }
 
+  /**
+   * Get all spans stored on this host.
+   */
   array<Span> getAllSpans();
+
+  /**
+   * Get spans occuring between start and end. Each is a unix timestamp
+   * in nanosecond units (for consistency with TimestampedEvent).
+   */
+  array<Span> getSpansInRange(long start, long end);
 }

Modified: avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avpr
URL: http://svn.apache.org/viewvc/avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avpr?rev=985363&r1=985362&r2=985363&view=diff
==============================================================================
--- avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avpr (original)
+++ avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avpr Fri Aug 13 20:58:13 2010
@@ -64,6 +64,19 @@
         "type" : "array",
         "items" : "Span"
       }
+    },
+    "getSpansInRange" : {
+      "request" : [ {
+        "name" : "start",
+        "type" : "long"
+      }, {
+        "name" : "end",
+        "type" : "long"
+      } ],
+      "response" : {
+        "type" : "array",
+        "items" : "Span"
+      }
     }
   }
-}
\ No newline at end of file
+}