You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ph...@apache.org on 2010/08/05 02:07:25 UTC

svn commit: r982434 [1/2] - in /avro/trunk: ./ lang/java/src/java/org/apache/avro/ipc/ lang/java/src/java/org/apache/avro/ipc/trace/ lang/java/src/test/java/org/apache/avro/ipc/trace/ share/schemas/org/apache/avro/ipc/trace/

Author: philz
Date: Thu Aug  5 00:07:24 2010
New Revision: 982434

URL: http://svn.apache.org/viewvc?rev=982434&view=rev
Log:
AVRO-595. Add Basic Trace Collection and Propagation.
(Patrick Wendell via philz)


Added:
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanAggregator.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Trace.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceClientServlet.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceNode.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/
    avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestBasicTracing.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestSpanAggregation.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestSpanTraceFormation.java
    avro/trunk/share/schemas/org/apache/avro/ipc/trace/
    avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avdl
    avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avpr
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=982434&r1=982433&r2=982434&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu Aug  5 00:07:24 2010
@@ -13,6 +13,9 @@ Avro 1.4.0 (unreleased)
 
   NEW FEATURES
 
+    AVRO-595. Add Basic Trace Collection and Propagation.
+    (Patrick Wendell via philz)
+
     AVRO-493. Add support for Hadoop Mapreduce with Avro data files. (cutting)
 
     AVRO-285: Specify one-way messages and implement in Java. (cutting)

Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java?rev=982434&r1=982433&r2=982434&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java Thu Aug  5 00:07:24 2010
@@ -102,15 +102,15 @@ public abstract class Requestor {
       writeRequest(m.getRequest(), request, out); // write request payload
       List<ByteBuffer> payload = bbo.getBufferList();
       
+      writeHandshake(out);                       // prepend handshake if needed
+      META_WRITER.write(context.requestCallMeta(), out);
+      out.writeString(m.getName());               // write message name
+      
       context.setRequestPayload(payload);
       for (RPCPlugin plugin : rpcMetaPlugins) {
         plugin.clientSendRequest(context);        // get meta-data from plugins
       }
       
-      writeHandshake(out);                       // prepend handshake if needed
-      META_WRITER.write(context.requestCallMeta(), out);
-      out.writeString(m.getName());               // write message name
-      
       bbo.append(payload);
       
       List<ByteBuffer> requestBytes = bbo.getBufferList();

Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java Thu Aug  5 00:07:24 2010
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc.trace;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Example implementation of SpanStorage which keeps spans in memory.
+ * 
+ * This is designed as a prototype for demonstration and testing. It should
+ * not be used in production settings.
+ *
+ */
+public class InMemorySpanStorage implements SpanStorage {
+  private static final long DEFAULT_MAX_SPANS = 10000;
+  
+  protected LinkedList<Span> spans;
+  private long maxSpans;
+
+  public InMemorySpanStorage() {
+    this.spans = new LinkedList<Span>();
+    this.maxSpans = DEFAULT_MAX_SPANS;
+  }
+  
+  @Override
+  public void addSpan(Span s) {
+    synchronized (this.spans) {  
+      this.spans.add(s);
+      if (this.spans.size() > this.maxSpans) {
+        this.spans.removeFirst();
+      }
+    }
+  }
+
+  @Override
+  public void setMaxSpans(long maxSpans) {
+    this.maxSpans = maxSpans;
+    
+    synchronized (this.spans) {
+      while (this.spans.size() > maxSpans) {
+        this.spans.removeFirst();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public List<Span> getAllSpans() {
+    return (LinkedList<Span>) this.spans.clone();
+  }
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanAggregator.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanAggregator.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanAggregator.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanAggregator.java Thu Aug  5 00:07:24 2010
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.trace;
+
+import static org.apache.avro.ipc.trace.Util.idsEqual;
+import static org.apache.avro.ipc.trace.Util.longValue;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.emory.mathcs.backport.java.util.Arrays;
+
+/**
+ * Utility methods for aggregating spans together at various
+ * points of trace analysis.
+ */
+public class SpanAggregator {
+  /**
+   * Class to store the results of span aggregation.
+   */
+  public static class SpanAggregationResults {
+    /** Spans which have data from client and server. */
+    public List<Span> completeSpans;
+    
+    /** Spans which have data only from client or server, or in which
+     * an ID collision was detected.*/
+    public List<Span> incompleteSpans;
+    
+    public SpanAggregationResults() {
+      completeSpans = new LinkedList<Span>();
+      incompleteSpans = new LinkedList<Span>();
+    }
+  }
+  
+  /**
+   * Class to store the results of trace formation.
+   */
+  public static class TraceFormationResults {
+    /** Traces which were successfully created. */
+    public List<Trace> traces;
+    
+    /** Spans which did not describe a complete trace. */
+    public List<Span> rejectedSpans;
+    
+    public TraceFormationResults() {
+      traces = new LinkedList<Trace>();
+      rejectedSpans = new LinkedList<Span>();
+    }
+  }
+  
+  /**
+   * Merge a list of incomplete spans (data filled in from only client or 
+   * server) into complete spans (full client and server data). 
+   */
+  @SuppressWarnings("unchecked")
+  static SpanAggregationResults getFullSpans(List<Span> partials) {
+    SpanAggregationResults out = new SpanAggregationResults();
+    HashMap<Long, Span> seenSpans = new HashMap<Long, Span>();
+    List<SpanEvent> allEvents = (List<SpanEvent>) Arrays.asList(
+        SpanEvent.values());
+    
+    for (Span s: partials) {
+      EnumSet<SpanEvent> foundEvents = Util.getAllEvents(s);
+      
+      // Span has complete data already
+      if (foundEvents.containsAll(allEvents)) {
+        out.completeSpans.add(s);
+      } 
+      // We haven't seen other half yet
+      else if (!seenSpans.containsKey(
+          Util.longValue(s.spanID))) {
+        seenSpans.put(Util.longValue(s.spanID), s);
+      }    
+      // We have seen other half
+      else {
+        Span other = seenSpans.remove(Util.longValue(s.spanID));  
+        if (!other.messageName.equals(s.messageName) ||
+            !idsEqual(other.parentSpanID, s.parentSpanID)) {
+          out.incompleteSpans.add(s);
+          out.incompleteSpans.add(other);
+        } else {
+          foundEvents.addAll(Util.getAllEvents(other));
+          if (other.requestorHostname != null) {
+            s.requestorHostname = other.requestorHostname;
+          }
+          if (other.responderHostname != null) {
+            s.responderHostname = other.responderHostname;
+          }
+          
+          // We have a complete span between the two
+          if (foundEvents.containsAll(allEvents)) {
+            for (TimestampedEvent event: other.events) {
+              s.events.add(event);
+            }
+          }
+          s.complete = true;
+          out.completeSpans.add(s);
+        }
+      }
+    }
+    
+    // Flush unmatched spans
+    for (Span s: seenSpans.values()) {
+      out.incompleteSpans.add(s);
+    }
+    return out;
+  }
+  
+  /**
+   * Given a list of Spans extract as many Trace objects as possible.
+   * A {@link Trace} is a tree-like data structure containing spans.
+   */
+  static TraceFormationResults getTraces(List<Span> spans) {
+    /** Traces indexed by traceID. */
+    HashMap<Long, List<Span>> traces = new HashMap<Long, List<Span>>();
+    
+    for (Span s: spans) {
+      if (traces.get(longValue(s.traceID)) == null) {
+         traces.put(longValue(s.traceID), new ArrayList<Span>());
+      }
+      traces.get(longValue(s.traceID)).add(s);
+    }    
+    
+    TraceFormationResults out = new TraceFormationResults();
+    
+    for (List<Span> spanSet : traces.values()) {
+       Trace trace = Trace.extractTrace(spanSet);
+       if (trace != null) {
+         out.traces.add(trace);
+       }
+       else {
+         out.rejectedSpans.addAll(spanSet);
+       }
+    } 
+    return out;
+  }
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java Thu Aug  5 00:07:24 2010
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc.trace;
+
+import java.util.List;
+
+/**
+ * Responsible for storing spans locally and answering span queries.
+ * 
+ * Since query for a given set of spans may persist over several RPC
+ * calls, they are indexed by a handle.
+ *
+ */
+public interface SpanStorage {
+  /**
+   * Add a span. 
+   * @param s
+   */
+  void addSpan(Span s);
+  
+  /**
+   * Set the maximum number of spans to have in storage at any given time.
+   */
+  void setMaxSpans(long maxSpans);
+  
+  /**
+   * Return a list of all spans currently stored. For testing.
+   */
+  List<Span> getAllSpans();
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Trace.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Trace.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Trace.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Trace.java Thu Aug  5 00:07:24 2010
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.trace;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Collections;
+
+/**
+ * A Trace is a tree of spans which reflects the actual call structure of a 
+ * recursive RPC call tree. Each node in a Trace represents a RPC 
+ * request/response pair. Each node also has zero or more child nodes.
+ */
+public class Trace {  
+  private TraceNode root;
+
+  /**
+   * Construct a trace given a root TraceNode.
+   */
+  public Trace(TraceNode root) {
+    this.root = root;
+  }
+  
+  /**
+   * Return the root node of this trace.
+   */
+  public TraceNode getRoot() {
+    return this.root;
+  }
+ 
+  /**
+   * Provide a hashCode unique to the execution path of this trace.
+   * 
+   * This is useful for grouping several traces which represent the same
+   * execution path (for instance, when we want to calculate averages for a
+   * large number of identical traces).
+   */
+  public int executionPathHash() {
+    // The string representation will be unique to a call tree, so we
+    // can borrow the hashCode from that string.
+    return this.printBrief().hashCode();
+  }
+
+  private class NodeComparator implements Comparator<TraceNode> {
+    @Override
+    public int compare(TraceNode tn0, TraceNode tn1) {
+      // We sort nodes alphabetically by the message name
+      int result = tn0.span.messageName.compareTo(tn1.span.messageName);
+      
+      if (result != 0) {
+        return result;
+      }
+      /* NOTE:
+       * If two spans containing the *same* RPC message share a parent, we need
+       * a way to consistently order them. Here, we use the send time to
+       * break ties. This will only work deterministically for non-async
+       * clients. For asynchronous clients, aggregated statistics based on this 
+       * ordering may be incorrect, since we have no way to disambiguate one
+       * function call from another. 
+       */
+      else {
+        Long tn0SendTime = tn0.extractEventTime(tn0, SpanEvent.CLIENT_SEND);
+        Long tn1SendTime = tn1.extractEventTime(tn1, SpanEvent.CLIENT_SEND);
+        
+        return tn0SendTime.compareTo(tn1SendTime);
+      }
+    }
+  }
+  
+  /**
+   * Print a brief description of this trace describing the execution
+   * path, but not timing data. This is for debugging or quickly profiling
+   * traces.
+   * 
+   * For instance the trace:
+   *     x
+   *    /
+   *   w
+   *    \
+   *     y--z
+   *     
+   * is encoded as:
+   * (w (x) (y (z)))
+   */
+  public String printBrief() {
+    if (this.root == null) { return "Trace: <empty>"; }
+    String out = "Trace: (";
+    out += this.root.span.messageName + " ";
+    out += printBriefRecurse(root.children);
+    out += ")";
+    return out;
+  }
+  
+  private String printBriefRecurse(List<TraceNode> children) {
+    String out = "";
+    // We sort so equivalent traces always print identically 
+    Collections.sort(children, new NodeComparator());
+    for (int i = 0; i < children.size(); i++) {
+      TraceNode tn = children.get(i);
+      out += "(" + tn.span.messageName;
+      if (tn.children.size() > 0) {
+        out += " ";
+        out += printBriefRecurse(tn.children);
+      }
+      out += ")";
+      if (i != children.size() - 1) {
+        out += " ";
+      }
+    }
+    return out;
+  }
+  
+  /**
+   * Print a description of this trace which includes timing data. This is for 
+   * debugging or quickly profiling traces.
+   * 
+   * For instance the trace:
+   *     x
+   *    /
+   *   w
+   *    \
+   *     x
+   *     
+   * Might print as:
+   * w 87ms
+   *  x 10ms
+   *  x 2ms
+   */
+  public String printWithTiming() {
+    if (this.root == null) { return "Trace: <empty>"; }
+    String out = "Trace: " + "\n";
+    List<TraceNode> rootList = new LinkedList<TraceNode>();
+    rootList.add(this.root);
+    out += printWithTimingRecurse(rootList, 0);
+    return out;
+  }
+  
+  private String printWithTimingRecurse(List<TraceNode> children, int depth) {
+    String out = "";
+    // We sort so equivalent traces always print identically 
+    Collections.sort(children, new NodeComparator());
+    for (TraceNode tn : children) {
+      long clientSend = 0;
+      long clientReceive = 0;
+      for (TimestampedEvent te: tn.span.events) {
+        if (te.event instanceof SpanEvent) {
+          SpanEvent ev = (SpanEvent) te.event;
+          if (ev.equals(SpanEvent.CLIENT_RECV)) {
+            clientReceive = te.timeStamp / 1000000;
+          } else if (ev.equals(SpanEvent.CLIENT_SEND)) {
+            clientSend = te.timeStamp / 1000000;
+          }
+        }
+      }
+      
+      for (int i = 0; i < depth; i++) { out = out + "  "; } // indent
+      out += tn.span.messageName + " " + (clientReceive - clientSend) + "ms\n";
+      if (tn.children.size() > 0) {
+        out += printWithTimingRecurse(tn.children, depth + 1);
+      }
+    }
+
+    return out;
+  }
+  
+  /**
+   * Construct a Trace from a list of Span objects. If no such trace
+   * can be created (if the list does not describe a complete trace)
+   * returns null.
+   */
+  public static Trace extractTrace(List<Span> spans) {
+    /**
+     * Map of span id's to a list of child span id's
+     */
+    HashMap<Long, List<Long>> children = new HashMap<Long, List<Long>>();
+    
+    /**
+     * Map of span id's to spans
+     */
+    HashMap<Long, Span> spanRef = new HashMap<Long, Span>();
+    
+    /**
+     * Root span
+     */
+    Span rootSpan = null;
+    
+    for (Span s: spans) {
+      spanRef.put(Util.longValue(s.spanID), s);
+      if (s.parentSpanID == null) {
+        rootSpan = s;
+      } else {
+        if (children.get(Util.longValue(s.parentSpanID)) == null) {
+          LinkedList<Long> list = new LinkedList<Long>();
+          list.add(Util.longValue(s.spanID));
+          children.put(Util.longValue(s.parentSpanID), list);
+        } else {
+          children.get(Util.longValue(s.parentSpanID)).add(
+              Util.longValue(s.spanID));
+        }
+      }
+    }
+    if (rootSpan == null) { // We never found a root
+      return null;
+    }
+    TraceNode rootNode = getNode(rootSpan, spanRef, children);
+    return new Trace(rootNode);
+  }
+  
+  /**
+   * Recursive helper method to create a span tree. 
+   */
+  private static TraceNode getNode(
+      Span s, HashMap<Long, Span> spanRef, HashMap<Long, List<Long>> children) {
+    TraceNode out = new TraceNode();
+    out.span = s;
+    out.children = new LinkedList<TraceNode>();
+    
+    List<Long> kids = children.get(Util.longValue(s.spanID));
+    if (kids == null) { return out; } // no children (base case) 
+    
+    for (long childID: kids) {
+      Span childSpan = spanRef.get(childID);
+      if (childSpan == null) { return null; } // invalid span reference
+      out.children.add(getNode(childSpan, spanRef, children));
+    }
+    return out;
+  }
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceClientServlet.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceClientServlet.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceClientServlet.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceClientServlet.java Thu Aug  5 00:07:24 2010
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.trace;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+public class TraceClientServlet extends HttpServlet {
+  public void doPost(HttpServletRequest request, 
+    HttpServletResponse response)
+    throws ServletException, IOException {
+    response.setContentType("text/html");
+    PrintWriter out = response.getWriter();
+
+    out.println("<title>Example</title> <body bgcolor=FFFFFF>");
+    out.println("<h2>Button Clicked</h2>");
+
+    String servers = request.getParameter("servers");
+
+    if(servers != null){
+      String splitToken = System.getProperty("line.separator");
+      if (splitToken == null) {
+        splitToken = "\n";
+      }
+      String[] parts = servers.split(splitToken);
+      for (String p : parts) {
+        out.println(p + "<br>");
+      }
+    } else {
+       out.println("No text entered.");
+    }
+  }
+  
+  public void doGet(HttpServletRequest request, 
+      HttpServletResponse response) throws IOException {
+    response.setContentType("text/html");
+    PrintWriter out = response.getWriter();
+    
+    out.println("<html>");
+    out.println("<head>Form</head>");
+    out.println("<body><form method='post'>");
+    out.println("<textarea name='servers'></textarea>");
+    out.println("<input type='submit' name='submit'>");
+    out.println("</form>");
+    out.println("</html>");
+  }
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceNode.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceNode.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceNode.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceNode.java Thu Aug  5 00:07:24 2010
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.trace;
+
+import java.util.List;
+
+/**
+ * A node of of an RPC {@link Trace}. Each node stores a {@link Span} object
+ * and a list of zero or more child nodes.
+ */
+class TraceNode {
+  /**
+   * The {@link Span} to which corresponds to this node in the call tree.
+   */
+  public Span span;
+  
+  /**
+   * A list of this TraceNode's children.
+   */
+  public List<TraceNode> children;
+
+  public TraceNode(Span span, List<TraceNode> children) {
+    this.span = span;
+    this.children = children;
+  }
+  
+  public TraceNode() {
+    
+  }
+ 
+  /** 
+   * Return the time stamp associated with a particular SpanEvent in this
+   * Trace Node. Return -1 if the TraceNode's Span did not contain that event.
+   */
+  public long extractEventTime(TraceNode tn, SpanEvent e) {
+    for (TimestampedEvent te: tn.span.events) {
+      if ((te.event instanceof SpanEvent) && 
+          (SpanEvent) te.event == e) {
+        return te.timeStamp;
+      }
+    }
+    return -1;
+  }
+ 
+  /**
+   * Return time delta between { @link SpanEvent.CLIENT_SEND } and 
+   * { @link SpanEvent.SERVER_RECV }. This may be negative or zero in the 
+   * case of clock skew.
+   */
+  public long getPreNetworkTime() {
+    long clientSend = extractEventTime(this, SpanEvent.CLIENT_SEND);
+    long serverReceive = extractEventTime(this, SpanEvent.SERVER_RECV);
+    
+    return serverReceive - clientSend;
+  }
+  
+  /**
+   * Return time delta between { @link SpanEvent.SERVER_SEND } and 
+   * { @link SpanEvent.CLIENT_RECV }. This may be negative or zero in the 
+   * case of clock skew.
+   */
+  public long getPostNetworkTime() {
+    long serverSend = extractEventTime(this, SpanEvent.SERVER_SEND);
+    long clientReceive = extractEventTime(this, SpanEvent.CLIENT_RECV);
+    
+    return clientReceive - serverSend;
+  }
+  
+  /**
+   * Return time delta between { @link SpanEvent.SERVER_RECV } and 
+   * { @link SpanEvent.SERVER_SEND}.
+   */
+  public long getProcessTime() {
+    long serverReceive = extractEventTime(this, SpanEvent.SERVER_RECV);
+    long serverSend = extractEventTime(this, SpanEvent.SERVER_SEND);
+    
+    return serverSend - serverReceive;
+  } 
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java Thu Aug  5 00:07:24 2010
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc.trace;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.RPCContext;
+import org.apache.avro.ipc.RPCPlugin;
+import org.apache.avro.specific.SpecificResponder;
+import org.apache.avro.util.Utf8;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.bio.SocketConnector;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A tracing plugin for Avro.
+ * 
+ * This plugin traces RPC call timing and follows nested trees of RPC
+ * calls. To use, instantiate and add to an existing Requestor or
+ * Responder. If you have a Responder that itself acts as an RPC client (i.e.
+ * it contains a Requestor) be sure to pass the same instance of the 
+ * plugin to both components so that propagation occurs correctly.
+ * 
+ * Currently, propagation only works if each requester is performing
+ * serial RPC calls. That is, we cannot support the case in which the
+ * Requestor's request does not occur in the same thread as the Responder's
+ * response. 
+ */
+public class TracePlugin extends RPCPlugin {
+  final private static Random RANDOM = new Random();
+  private static final Logger LOG = LoggerFactory.getLogger(TracePlugin.class);
+  public static enum StorageType { MEMORY, DISK };
+  
+  /*
+   * This plugin uses three key/value meta-data pairs. The value type for all
+   * of these pairs is fixed(8) and they are assumed to be encoded as 8-byte
+   * ID's. The presence of a TRACE_ID_KEY and a SPAN_ID_KEY in a message
+   * signals that tracing is in progress. The optional PARENT_SPAN_ID_KEY
+   * signals that this message has a parent node in the RPC call tree. 
+   */
+  private static final Utf8 TRACE_ID_KEY = new Utf8("traceID");
+  private static final Utf8 SPAN_ID_KEY = new Utf8("spanID");
+  private static final Utf8 PARENT_SPAN_ID_KEY = new Utf8("parentSpanID");
+  
+
+  class TraceResponder implements AvroTrace {
+    private SpanStorage spanStorage;
+    
+    public TraceResponder(SpanStorage spanStorage) {
+      this.spanStorage = spanStorage;
+    }
+
+    @Override
+    public GenericArray<Span> getAllSpans() throws AvroRemoteException {
+      List<Span> spans = this.spanStorage.getAllSpans();
+      GenericData.Array<Span> out;
+      synchronized (spans) { 
+        out = new GenericData.Array<Span>(spans.size(), 
+          Schema.createArray(Span.SCHEMA$));
+        for (Span s: spans) {
+          out.add(s);
+        }
+      }
+      return out;
+    }
+  }
+  
+  private double traceProb; // Probability of starting tracing
+  private int port;         // Port to serve tracing data
+  private int clientPort;   // Port to expose client HTTP interface
+  private StorageType storageType;  // How to store spans
+  private long maxSpans; // Max number of spans to store
+  private boolean enabled; // Whether to participate in tracing
+
+  private ThreadLocal<Span> currentSpan; // span in which we are server
+  private ThreadLocal<Span> childSpan;   // span in which we are client
+  
+  // Storage and serving of spans
+  protected SpanStorage storage;
+  protected HttpServer httpServer;
+  protected SpecificResponder responder;
+  
+  // Client interface
+  protected Server clientFacingServer;
+  
+  public TracePlugin(TracePluginConfiguration conf) throws IOException {
+    traceProb = conf.traceProb;
+    port = conf.port;
+    clientPort = conf.clientPort;
+    storageType = conf.storageType;
+    maxSpans = conf.maxSpans;
+    enabled = conf.enabled;
+    
+    // check bounds
+    if (!(traceProb >= 0.0 && traceProb <= 1.0)) { traceProb = 0.0; }
+    if (!(port > 0 && port < 65535)) { port = 51001; }
+    if (!(clientPort > 0 && clientPort < 65535)) { clientPort = 51200; }
+    if (maxSpans < 0) { maxSpans = 5000; }
+    
+    currentSpan = new ThreadLocal<Span>(){
+      @Override protected Span initialValue(){
+          return null;
+      }
+    };
+    
+    childSpan = new ThreadLocal<Span>(){
+      @Override protected Span initialValue(){
+          return null;
+      }
+    };
+
+    if (storageType.equals("MEMORY")) {
+      this.storage = new InMemorySpanStorage();
+    }
+    else { // default
+      this.storage = new InMemorySpanStorage();
+    }
+    
+    this.storage.setMaxSpans(maxSpans);
+    
+    // Start serving span data
+    responder = new SpecificResponder(
+        AvroTrace.PROTOCOL, new TraceResponder(this.storage)); 
+    
+    boolean bound = false;
+    
+    while (!bound) {
+      // rather than die if port is taken, try to fail over to another port.
+      try {
+        httpServer = new HttpServer(responder, this.port);
+        bound = true;
+      } catch (AvroRuntimeException e) {
+        if (e.getCause() instanceof BindException) {
+          LOG.error("Failed to bind to port: " + this.port);
+          this.port = this.port +1;
+        } else {
+          throw e;
+        }
+      }
+    }
+    
+    // Start client-facing servlet
+    initializeClientServer();
+  }
+  
+  @Override
+  public void clientStartConnect(RPCContext context) {
+    // There are two cases in which we will need to seed a trace
+    // (1) If we probabilistically decide to seed a new trace
+    // (2) If we are part of an existing trace
+    
+    if ((this.currentSpan.get() == null) && 
+        (RANDOM.nextFloat() < this.traceProb) && enabled) {
+      // Start new trace
+      Span span = Util.createEventlessSpan(null, null, null);
+      this.childSpan.set(span);
+    }
+    
+    if ((this.currentSpan.get() != null) && enabled) {
+      Span currSpan = this.currentSpan.get();
+      Span span = Util.createEventlessSpan(
+          currSpan.traceID, null, currSpan.spanID);   
+      this.childSpan.set(span);
+    }
+    
+    if (this.childSpan.get() != null) {
+      Span span = this.childSpan.get();
+      context.requestHandshakeMeta().put(
+          TRACE_ID_KEY, ByteBuffer.wrap(span.traceID.bytes()));
+      context.requestHandshakeMeta().put(
+          SPAN_ID_KEY, ByteBuffer.wrap(span.spanID.bytes()));
+      if (span.parentSpanID != null) {
+        context.requestHandshakeMeta().put(
+            PARENT_SPAN_ID_KEY, ByteBuffer.wrap(span.parentSpanID.bytes())); 
+      }
+    }
+  }
+  
+  @Override
+  public void serverConnecting(RPCContext context) {
+    Map<Utf8, ByteBuffer> meta = context.requestHandshakeMeta();
+    // Are we being asked to propagate a trace?
+    if (meta.containsKey(TRACE_ID_KEY) && enabled) {
+      if (!(meta.containsKey(SPAN_ID_KEY))) {
+        LOG.warn("Span ID missing for trace " +
+            meta.get(TRACE_ID_KEY).toString());
+        return; // should have been given full span data
+      }
+      byte[] spanIDBytes = new byte[8];
+      meta.get(SPAN_ID_KEY).get(spanIDBytes);
+      ID spanID = new ID();
+      spanID.bytes(spanIDBytes);
+      
+      ID parentSpanID = null;
+      if (meta.get(PARENT_SPAN_ID_KEY) != null) {
+        parentSpanID = new ID();
+        parentSpanID.bytes(meta.get(PARENT_SPAN_ID_KEY).array());
+      }
+      ID traceID = new ID();
+      traceID.bytes(meta.get(TRACE_ID_KEY).array());
+      
+      Span span = Util.createEventlessSpan(traceID, spanID, parentSpanID);
+      
+      span.events = new GenericData.Array<TimestampedEvent>(
+          100, Schema.createArray(TimestampedEvent.SCHEMA$));
+      this.currentSpan.set(span);
+    }
+  }
+  
+  @Override
+  public void clientFinishConnect(RPCContext context) { }
+
+  @Override
+  public void clientSendRequest(RPCContext context) {
+    if (this.childSpan.get() != null) {
+      Span child = this.childSpan.get();
+      Util.addEvent(child, SpanEvent.CLIENT_SEND);
+      child.messageName = new Utf8(
+          context.getMessage().getName());
+      child.requestPayloadSize = Util.getPayloadSize(
+          context.getRequestPayload());
+    }
+  }
+ 
+  @Override
+  public void serverReceiveRequest(RPCContext context) {
+    if (this.currentSpan.get() != null) {
+      Span current = this.currentSpan.get();
+      Util.addEvent(current, SpanEvent.SERVER_RECV);
+      current.messageName = new Utf8(
+          context.getMessage().getName());
+      current.requestPayloadSize = Util.getPayloadSize(
+          context.getRequestPayload());
+    }
+  }
+  
+  @Override
+  public void serverSendResponse(RPCContext context) {
+    if (this.currentSpan.get() != null) {
+      Span current = this.currentSpan.get();
+      Util.addEvent(current, SpanEvent.SERVER_SEND);
+      current.responsePayloadSize = 
+        Util.getPayloadSize(context.getResponsePayload());
+      this.storage.addSpan(this.currentSpan.get());
+      this.currentSpan.set(null);
+    }
+  }
+  
+  @Override
+  public void clientReceiveResponse(RPCContext context) {
+    if (this.childSpan.get() != null) {
+      Span child = this.childSpan.get();
+      Util.addEvent(child, SpanEvent.CLIENT_RECV);
+      child.responsePayloadSize = 
+        Util.getPayloadSize(context.getResponsePayload());
+      this.storage.addSpan(this.childSpan.get());
+      this.childSpan.set(null);
+    }
+  }
+  
+  /**
+   * Start a client-facing server. Can be overridden if users
+   * prefer to attach client Servlet to their own server. 
+   */
+  protected void initializeClientServer() {
+    clientFacingServer = new Server();
+    Context context = new Context(clientFacingServer, "/");
+    context.addServlet(new ServletHolder(new TraceClientServlet()), "/");
+    boolean connected = false;
+    SocketConnector socket = null;
+    
+    // Keep trying ports until we can connect
+    while (!connected) {
+      try {
+        socket = new SocketConnector();
+        socket.setPort(clientPort);
+        clientFacingServer.addConnector(socket);
+        clientFacingServer.start();
+        connected = true;
+      } catch (Exception e) {
+        if (e instanceof BindException) {
+          clientFacingServer.removeConnector(socket);
+          clientPort = clientPort + 1;
+          continue;
+        }
+        else {
+          break; // Fail silently here (this is currently unused)
+        }
+      }
+    }
+  }
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java Thu Aug  5 00:07:24 2010
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.trace;
+
+import org.apache.avro.ipc.trace.TracePlugin.StorageType;
+
+/**
+ * Helper class for configuring Avro's {@link TracePlugin}. If you are using 
+ * a common configuration module, wrap this class with your own configuration. 
+ */
+public class TracePluginConfiguration {
+  public double traceProb; // Probability of starting tracing
+  public int port;         // Port to serve tracing data
+  public int clientPort;   // Port to expose client HTTP interface
+  public StorageType storageType;  // How to store spans
+  public long maxSpans;   // Max number of spans to store
+  public boolean enabled; // Whether or not we are active
+  
+  /**
+   * Return a TracePluginConfiguration with default options.
+   */
+  public TracePluginConfiguration() {
+    this.traceProb = 0.0;
+    this.port = 12335;
+    this.clientPort = 12345;
+    this.storageType = StorageType.MEMORY;
+    this.maxSpans = 10000;
+    this.enabled = true;
+  }
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java Thu Aug  5 00:07:24 2010
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc.trace;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.util.Utf8;
+
+/**
+ * Utility methods for common tasks in Avro tracing. Mostly consists of
+ * static methods which we can't put in auto-generated classes.
+ */
+class Util {
+  final private static Random RANDOM = new Random();
+  final private static int NANOS_PER_MILI = 1000000;
+  private static Utf8 hostname;
+  
+  /**
+   * Get all SpanEvents contained in Span s.
+   */
+  public static EnumSet<SpanEvent> getAllEvents(Span s) {
+    EnumSet<SpanEvent> foundEvents = EnumSet.noneOf(SpanEvent.class);
+    for (TimestampedEvent event: s.events) {
+      if (event.event instanceof SpanEvent) {
+        foundEvents.add((SpanEvent) event.event);
+      }
+    }
+    return foundEvents;
+  }
+  
+  
+  /**
+   * Get the size of an RPC payload.
+   */
+  public static int getPayloadSize(List<ByteBuffer> payload) {
+    if (payload == null) {
+      return 0;
+    }
+    int size = 0;
+    for (ByteBuffer bb: payload) {
+      size = size + bb.limit();
+    }
+    return size;
+  }
+  
+  /**
+   * Create a span without any events. If traceID or spanID is null, randomly
+   * generate them. If parentSpanID is null, assume this is a root span.
+   */
+  public static Span createEventlessSpan(ID traceID, ID spanID, ID parentSpanID) {
+    Span span = new Span();
+    span.complete = false;
+    
+    if (traceID == null) {
+      byte[] traceIDBytes = new byte[8];
+      RANDOM.nextBytes(traceIDBytes);
+      span.traceID = new ID();
+      span.traceID.bytes(traceIDBytes);
+    } else {
+      span.traceID = new ID();
+      span.traceID.bytes(traceID.bytes().clone());
+    }
+    
+    if (spanID == null) {
+      byte[] spanIDBytes = new byte[8];
+      RANDOM.nextBytes(spanIDBytes);
+      span.spanID = new ID();
+      span.spanID.bytes(spanIDBytes);
+    } else {
+      span.spanID = new ID();
+      span.spanID.bytes(spanID.bytes().clone());
+    }
+
+    if (parentSpanID != null) {
+      span.parentSpanID = new ID();
+      span.parentSpanID.bytes(parentSpanID.bytes().clone());
+    }
+    
+    span.events = new GenericData.Array<TimestampedEvent>(
+        10, Schema.createArray(TimestampedEvent.SCHEMA$));
+    
+    if (hostname == null) {
+      try {
+        hostname = new Utf8(InetAddress.getLocalHost().toString());
+      } catch (UnknownHostException e) {
+        hostname = new Utf8("Unknown");
+      }
+    }
+    span.requestorHostname = hostname;
+    return span;
+  }
+  
+  /**
+   * Add a TimestampedEvent to a Span using the current time. 
+   */
+  public static void addEvent(Span span, SpanEvent eventType) {
+    TimestampedEvent ev = new TimestampedEvent();
+    ev.event = eventType;
+    ev.timeStamp = System.currentTimeMillis() * NANOS_PER_MILI;
+    span.events.add(ev);
+  }
+  
+  /**
+   * Get the long value from a given ID object.
+   */
+  public static long longValue(ID in) {
+    if (in == null) { 
+      throw new IllegalArgumentException("ID cannot be null");
+    }
+    if (in.bytes() == null) {
+      throw new IllegalArgumentException("ID cannot be empty");
+    }
+    if (in.bytes().length != 8) {
+      throw new IllegalArgumentException("ID must be 8 bytes");
+    }
+    ByteBuffer buff = ByteBuffer.wrap(in.bytes());
+    return buff.getLong();
+  }
+  
+  /**
+   * Get an ID associated with a given long value. 
+   */
+  public static ID idValue(long in) {
+    byte[] bArray = new byte[8];
+    ByteBuffer bBuffer = ByteBuffer.wrap(bArray);
+    LongBuffer lBuffer = bBuffer.asLongBuffer();
+    lBuffer.put(0, in);
+    ID out = new ID();
+    out.bytes(bArray);
+    return out;
+  }
+  
+  /**
+   * Verify the equality of ID objects. Both being null references is
+   * considered equal.
+   */
+  public static boolean idsEqual(ID a, ID b) {
+    if (a == null && b == null) { return true; }
+    if (a == null || b == null) { return false; }
+    
+    return Arrays.equals(a.bytes(), b.bytes());
+  }
+  
+}

Added: avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestBasicTracing.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestBasicTracing.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestBasicTracing.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestBasicTracing.java Thu Aug  5 00:07:24 2010
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.trace;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.Protocol.Message;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRequestor;
+import org.apache.avro.generic.GenericResponder;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.HttpTransceiver;
+import org.apache.avro.ipc.RPCPlugin;
+import org.apache.avro.ipc.Responder;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
+public class TestBasicTracing {
+  Protocol protocol = Protocol.parse("" + "{\"protocol\": \"Minimal\", "
+      + "\"messages\": { \"m\": {"
+      + "   \"request\": [{\"name\": \"x\", \"type\": \"int\"}], "
+      + "   \"response\": \"int\"} } }");
+  Message message = protocol.getMessages().get("m");
+
+  /** Expects 0 and returns 1. */
+  static class TestResponder extends GenericResponder {
+    public TestResponder(Protocol local) {
+      super(local);
+    }
+
+    @Override
+    public Object respond(Message message, Object request)
+        throws AvroRemoteException {
+      assertEquals(0, ((GenericRecord) request).get("x"));
+      return 1;
+    }
+  }
+
+  @Test
+  public void testBasicTrace() throws Exception {
+    TracePluginConfiguration conf = new TracePluginConfiguration();
+    conf.port = 51007;
+    conf.clientPort = 12344;
+    conf.traceProb = 1.0;
+    TracePlugin responderPlugin = new TracePlugin(conf);
+    conf.port = 51008;
+    conf.clientPort = 12345;
+    TracePlugin requestorPlugin = new TracePlugin(conf);
+    
+    Responder res = new TestResponder(protocol);
+    res.addRPCPlugin(responderPlugin);
+    
+    HttpServer server = new HttpServer(res, 50000);
+    server.start();
+    
+    HttpTransceiver trans = new HttpTransceiver(
+        new URL("http://localhost:50000"));
+    
+    GenericRequestor r = new GenericRequestor(protocol, trans);
+    r.addRPCPlugin(requestorPlugin);
+    
+    GenericRecord params = new GenericData.Record(protocol.getMessages().get(
+    "m").getRequest());
+    params.put("x", 0);
+    r.request("m", params);
+    
+    List<Span> responderSpans = responderPlugin.storage.getAllSpans();
+    assertEquals(1, responderSpans.size());
+    
+    List<Span> requestorSpans = requestorPlugin.storage.getAllSpans();
+    assertEquals(1, requestorSpans.size());
+    
+    if ((responderSpans.size() == 1 && requestorSpans.size() == 1)) {
+      Span responderSpan = responderSpans.get(0);
+      Span requestorSpan = requestorSpans.get(0);
+      
+      // Check meta propagation     
+      assertEquals(null, requestorSpan.parentSpanID);
+      assertEquals(responderSpan.parentSpanID, requestorSpan.parentSpanID);
+      assertEquals(responderSpan.traceID, requestorSpan.traceID);
+      
+      // Check other data
+      assertEquals(2, requestorSpan.events.size());
+      assertEquals(2, responderSpan.events.size());
+      assertTrue("m".equals(requestorSpan.messageName.toString()));
+      assertTrue("m".equals(responderSpan.messageName.toString()));
+      assertFalse(requestorSpan.complete);
+      assertFalse(responderSpan.complete);
+    }
+    
+    server.close();
+    
+    requestorPlugin.clientFacingServer.stop();
+    requestorPlugin.httpServer.close();
+    
+    responderPlugin.clientFacingServer.stop();
+    responderPlugin.httpServer.close();
+  }
+  
+  /*
+   * Test a more complicated, recursive trace involving four agents and three
+   * spans.
+   * 
+   * Messages are x, y, z which request/return 
+   * incrementing int values (shown below).
+   * 
+   *   |-w-(1)-> |         |
+   *   |         |-x-(2)-> | C
+   *   |         | <-x-(3)-|        
+   *   |         |    
+   * A |       B |    
+   *   |         |         |
+   *   |         |-x-(4)-> | 
+   *   |         | <-x-(5)-| D   
+   *   |         |         | 
+   *   |<-w-(6)- |         |
+   *   
+   *   Listening ports are B: 21005
+   *                       C: 21006  
+   *                       D: 21007
+   */
+  
+  Protocol advancedProtocol = Protocol.parse("{\"protocol\": \"Advanced\", "
+      + "\"messages\": { " 
+      +  "\"w\": { \"request\": [{\"name\": \"req\", \"type\": \"int\"}], "
+      + "   \"response\": \"int\"},"
+      + "\"x\": { \"request\": [{\"name\": \"req\", \"type\": \"int\"}], "
+      + "   \"response\": \"int\"},"
+      + "\"y\": { \"request\": [{\"name\": \"req\", \"type\": \"int\"}], "
+      + "   \"response\": \"int\"}"
+      + " } }");
+
+  /** Middle Responder */
+  static class RecursingResponder extends GenericResponder {
+    HttpTransceiver transC;
+    HttpTransceiver transD;
+    GenericRequestor reqC;
+    GenericRequestor reqD;
+    Protocol protocol;
+    
+    public RecursingResponder(Protocol local, RPCPlugin plugin) 
+    throws Exception {
+      super(local);
+      transC = new HttpTransceiver(
+          new URL("http://localhost:21006"));
+      transD = new HttpTransceiver(
+          new URL("http://localhost:21007"));
+      reqC = new GenericRequestor(local, transC);
+      reqC.addRPCPlugin(plugin);
+      reqD = new GenericRequestor(local, transD);
+      reqD.addRPCPlugin(plugin);
+      
+      protocol = local; 
+    }
+
+    @Override
+    public Object respond(Message message, Object request)
+        throws IOException {
+       assertTrue("w".equals(message.getName()));
+       GenericRecord inParams = (GenericRecord)request;
+       Integer currentCount = (Integer) inParams.get("req");
+       assertTrue(currentCount.equals(1));
+       
+       GenericRecord paramsC = new GenericData.Record(
+           protocol.getMessages().get("x").getRequest());
+       paramsC.put("req", currentCount + 1);
+       Integer returnC = (Integer) reqC.request("x", paramsC);
+       assertTrue(returnC.equals(currentCount + 2));
+       
+       GenericRecord paramsD = new GenericData.Record(
+           protocol.getMessages().get("x").getRequest());
+       paramsD.put("req", currentCount + 3);
+       Integer returnD = (Integer) reqD.request("x", paramsD);
+       assertTrue(returnD.equals(currentCount + 4));
+       
+       return currentCount + 5;
+    }
+  }
+  
+  /** Endpoint responder */
+  static class EndpointResponder extends GenericResponder {
+    public EndpointResponder(Protocol local) {
+      super(local);
+    }
+
+    @Override
+    public Object respond(Message message, Object request)
+        throws AvroRemoteException {
+      GenericRecord inParams = (GenericRecord)request;
+      Integer currentCount = (Integer) inParams.get("req");
+      
+      return currentCount + 1;
+    }
+  }
+
+  @Test
+  public void testRecursingTrace() throws Exception {
+    TracePluginConfiguration conf = new TracePluginConfiguration();
+    conf.traceProb = 1.0;
+    conf.port = 51010;
+    conf.clientPort = 12346;
+    TracePlugin aPlugin = new TracePlugin(conf);
+    conf.port = 51011;
+    conf.clientPort = 12347;
+    TracePlugin bPlugin = new TracePlugin(conf);
+    conf.port = 51012;
+    conf.clientPort = 12348;
+    TracePlugin cPlugin = new TracePlugin(conf);
+    conf.port = 51013;
+    conf.clientPort = 12349;
+    TracePlugin dPlugin = new TracePlugin(conf);
+    
+    // Responders
+    Responder bRes = new RecursingResponder(advancedProtocol, bPlugin);
+    bRes.addRPCPlugin(bPlugin);
+    HttpServer server1 = new HttpServer(bRes, 21005);
+    server1.start();
+
+    Responder cRes = new EndpointResponder(advancedProtocol);
+    cRes.addRPCPlugin(cPlugin);
+    HttpServer server2 = new HttpServer(cRes, 21006);
+    server2.start();
+    
+    Responder dRes = new EndpointResponder(advancedProtocol);
+    dRes.addRPCPlugin(dPlugin);
+    HttpServer server3 = new HttpServer(dRes, 21007);
+    server3.start();
+    
+    // Root requestor
+    HttpTransceiver trans = new HttpTransceiver(
+        new URL("http://localhost:21005"));
+    
+    GenericRequestor r = new GenericRequestor(advancedProtocol, trans);
+    r.addRPCPlugin(aPlugin);
+    
+    GenericRecord params = new GenericData.Record(
+        advancedProtocol.getMessages().get("w").getRequest());
+    params.put("req", 1);
+    r.request("w", params);
+    
+    // Verify counts
+    assertEquals(1, aPlugin.storage.getAllSpans().size());
+    assertEquals(3, bPlugin.storage.getAllSpans().size());
+    assertEquals(1, cPlugin.storage.getAllSpans().size());
+    assertEquals(1, dPlugin.storage.getAllSpans().size());
+    
+    ID traceID = aPlugin.storage.getAllSpans().get(0).traceID;
+    ID rootSpanID = null;
+    
+    // Verify event counts and trace ID propagation
+    for (Span s: aPlugin.storage.getAllSpans()) {
+      assertEquals(2, s.events.size());
+      assertTrue(Util.idsEqual(traceID, s.traceID));
+      assertFalse(s.complete);
+      rootSpanID = s.spanID;
+    }
+    
+    for (Span s: bPlugin.storage.getAllSpans()) {
+      assertEquals(2, s.events.size());
+      assertEquals(traceID, s.traceID);
+      assertFalse(s.complete);
+    }
+    
+    for (Span s: cPlugin.storage.getAllSpans()) {
+      assertEquals(2, s.events.size());
+      assertEquals(traceID, s.traceID);
+      assertFalse(s.complete);
+    }
+    for (Span s: dPlugin.storage.getAllSpans()) {
+      assertEquals(2, s.events.size());
+      assertEquals(traceID, s.traceID);
+      assertFalse(s.complete);
+    }
+    
+    // Verify span propagation.
+    ID firstSpanID = aPlugin.storage.getAllSpans().get(0).spanID;
+    ID secondSpanID = cPlugin.storage.getAllSpans().get(0).spanID;
+    ID thirdSpanID = dPlugin.storage.getAllSpans().get(0).spanID;
+    
+    boolean firstFound = false, secondFound = false, thirdFound = false;
+    for (Span s: bPlugin.storage.getAllSpans()) {
+      if (Util.idsEqual(s.spanID, firstSpanID)) {
+        firstFound = true;
+      }
+      else if (Util.idsEqual(s.spanID, secondSpanID)) {
+        secondFound = true;
+      }
+      else if (Util.idsEqual(s.spanID, thirdSpanID)) {
+        thirdFound = true;
+      }
+    }
+    assertTrue(firstFound);
+    assertTrue(secondFound);
+    assertTrue(thirdFound);
+    
+    server1.close();
+    server2.close();
+    server3.close();
+    aPlugin.httpServer.close();
+    aPlugin.clientFacingServer.stop();
+    bPlugin.httpServer.close();
+    bPlugin.clientFacingServer.stop();
+    cPlugin.httpServer.close();
+    cPlugin.clientFacingServer.stop();
+    dPlugin.httpServer.close();
+    dPlugin.clientFacingServer.stop();
+    
+  }
+  
+  /** Sleeps as requested. */
+  private static class SleepyResponder extends GenericResponder {
+    public SleepyResponder(Protocol local) {
+      super(local);
+    }
+
+    @Override
+    public Object respond(Message message, Object request)
+        throws AvroRemoteException {
+      try {
+        Thread.sleep((Long)((GenericRecord)request).get("millis"));
+      } catch (InterruptedException e) {
+        throw new AvroRemoteException(e);
+      }
+      return null;
+    }
+  }
+  
+  /**
+   * Demo program for using RPC trace. This automatically generates
+   * client RPC requests. 
+   * @param args
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    if (args.length == 0) {
+      args = new String[] { "7002", "7003" };
+    }
+    Protocol protocol = Protocol.parse("{\"protocol\": \"sleepy\", "
+        + "\"messages\": { \"sleep\": {"
+        + "   \"request\": [{\"name\": \"millis\", \"type\": \"long\"}," +
+          "{\"name\": \"data\", \"type\": \"bytes\"}], "
+        + "   \"response\": \"null\"} } }");
+    Log.info("Using protocol: " + protocol.toString());
+    Responder r = new SleepyResponder(protocol);
+    TracePlugin p = new TracePlugin(new TracePluginConfiguration());
+    r.addRPCPlugin(p);
+
+    // Start Avro server
+    new HttpServer(r, Integer.parseInt(args[0]));
+
+    HttpTransceiver trans = new HttpTransceiver(
+        new URL("http://localhost:" + Integer.parseInt(args[0])));
+    GenericRequestor req = new GenericRequestor(protocol, trans); 
+    TracePluginConfiguration clientConf = new TracePluginConfiguration();
+    clientConf.clientPort = 12346;
+    clientConf.port = 12336;
+    clientConf.traceProb = 1.0;
+    req.addRPCPlugin(new TracePlugin(clientConf)); 
+    
+    while(true) {
+      Thread.sleep(1000);
+      GenericRecord params = new GenericData.Record(protocol.getMessages().get(
+        "sleep").getRequest());
+      Random rand = new Random();
+      params.put("millis", Math.abs(rand.nextLong()) % 1000);
+      int payloadSize = Math.abs(rand.nextInt()) % 10000;
+      byte[] payload = new byte[payloadSize];
+      rand.nextBytes(payload);
+      params.put("data", ByteBuffer.wrap(payload));
+      req.request("sleep", params);
+    }
+  }
+}

Added: avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java Thu Aug  5 00:07:24 2010
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.trace;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.Protocol.Message;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRequestor;
+import org.apache.avro.generic.GenericResponder;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.HttpTransceiver;
+import org.apache.avro.ipc.RPCPlugin;
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.trace.SpanAggregator.SpanAggregationResults;
+import org.apache.avro.ipc.trace.SpanAggregator.TraceFormationResults;
+import org.junit.Test;
+
+/**
+ * Tests which test logging, aggregation, and analysis of traces.
+ */
+public class TestEndToEndTracing {
+  
+
+  /*
+   * Messages are w, x, y which request/return 
+   * incrementing int values (shown below).
+   * 
+   *   |-w-(1)-> |         |
+   *   |         |-x-(2)-> | C
+   *   |         | <-x-(3)-|        
+   *   |         |    
+   * A |       B |    
+   *   |         |         |
+   *   |         |-x-(4)-> | 
+   *   |         | <-x-(5)-| D   
+   *   |         |         | 
+   *   |<-w-(6)- |         |
+   *   
+   *   Listening ports are B: 21005
+   *                       C: 21006  
+   *                       D: 21007
+   */
+  
+  Protocol advancedProtocol = Protocol.parse("{\"protocol\": \"Advanced\", "
+      + "\"messages\": { " 
+      + "\"w\": { \"request\": [{\"name\": \"req\", \"type\": \"int\"}], "
+      + "   \"response\": \"int\"},"
+      + "\"x\": { \"request\": [{\"name\": \"req\", \"type\": \"int\"}], "
+      + "   \"response\": \"int\"},"
+      + "\"y\": { \"request\": [{\"name\": \"req\", \"type\": \"int\"}], "
+      + "   \"response\": \"int\"}"
+      + " } }");
+
+  /** Middle Responder */
+  static class RecursingResponder extends GenericResponder {
+    HttpTransceiver transC;
+    HttpTransceiver transD;
+    GenericRequestor reqC;
+    GenericRequestor reqD;
+    Protocol protocol;
+    
+    public RecursingResponder(Protocol local, RPCPlugin plugin) 
+    throws Exception {
+      super(local);
+      transC = new HttpTransceiver(
+          new URL("http://localhost:21006"));
+      transD = new HttpTransceiver(
+          new URL("http://localhost:21007"));
+      reqC = new GenericRequestor(local, transC);
+      reqC.addRPCPlugin(plugin);
+      reqD = new GenericRequestor(local, transD);
+      reqD.addRPCPlugin(plugin);
+      
+      protocol = local; 
+    }
+
+    @Override
+    public Object respond(Message message, Object request)
+        throws IOException {
+       assertTrue("w".equals(message.getName()));
+       GenericRecord inParams = (GenericRecord)request;
+       Integer currentCount = (Integer) inParams.get("req");
+       assertTrue(currentCount.equals(1));
+       
+       GenericRecord paramsC = new GenericData.Record(
+           protocol.getMessages().get("x").getRequest());
+       paramsC.put("req", currentCount + 1);
+       Integer returnC = (Integer) reqC.request("x", paramsC);
+       assertTrue(returnC.equals(currentCount + 2));
+       
+       GenericRecord paramsD = new GenericData.Record(
+           protocol.getMessages().get("x").getRequest());
+       paramsD.put("req", currentCount + 3);
+       Integer returnD = (Integer) reqD.request("x", paramsD);
+       assertTrue(returnD.equals(currentCount + 4));
+       
+       return currentCount + 5;
+    }
+  }
+  
+  /** Endpoint responder */
+  static class EndpointResponder extends GenericResponder {
+    public EndpointResponder(Protocol local) {
+      super(local);
+    }
+
+    @Override
+    public Object respond(Message message, Object request)
+        throws AvroRemoteException {
+      GenericRecord inParams = (GenericRecord)request;
+      Integer currentCount = (Integer) inParams.get("req");
+      
+      return currentCount + 1;
+    }
+  }
+
+  @Test
+  public void testTraceAndCollection() throws Exception {
+    TracePluginConfiguration conf = new TracePluginConfiguration();
+    conf.traceProb = 1.0;
+    conf.port = 51010;
+    conf.clientPort = 12346;
+    TracePlugin aPlugin = new TracePlugin(conf);
+    conf.port = 51011;
+    conf.clientPort = 12347;
+    TracePlugin bPlugin = new TracePlugin(conf);
+    conf.port = 51012;
+    conf.clientPort = 12348;
+    TracePlugin cPlugin = new TracePlugin(conf);
+    conf.port = 51013;
+    conf.clientPort = 12349;
+    TracePlugin dPlugin = new TracePlugin(conf);
+    
+    // Responders
+    Responder bRes = new RecursingResponder(advancedProtocol, bPlugin);
+    bRes.addRPCPlugin(bPlugin);
+    HttpServer server1 = new HttpServer(bRes, 21005);
+    server1.start();
+
+    Responder cRes = new EndpointResponder(advancedProtocol);
+    cRes.addRPCPlugin(cPlugin);
+    HttpServer server2 = new HttpServer(cRes, 21006);
+    server2.start();
+    
+    Responder dRes = new EndpointResponder(advancedProtocol);
+    dRes.addRPCPlugin(dPlugin);
+    HttpServer server3 = new HttpServer(dRes, 21007);
+    server3.start();
+    
+    // Root requestor
+    HttpTransceiver trans = new HttpTransceiver(
+        new URL("http://localhost:21005"));
+    
+    GenericRequestor r = new GenericRequestor(advancedProtocol, trans);
+    r.addRPCPlugin(aPlugin);
+    
+    GenericRecord params = new GenericData.Record(
+        advancedProtocol.getMessages().get("w").getRequest());
+    params.put("req", 1);
+    r.request("w", params);
+    
+    ArrayList<Span> allSpans = new ArrayList<Span>();
+    
+    allSpans.addAll(aPlugin.storage.getAllSpans());
+    allSpans.addAll(bPlugin.storage.getAllSpans());
+    allSpans.addAll(cPlugin.storage.getAllSpans());
+    allSpans.addAll(dPlugin.storage.getAllSpans());
+    
+    SpanAggregationResults results = SpanAggregator.getFullSpans(allSpans);
+    
+    TraceFormationResults traces = SpanAggregator.getTraces(results.completeSpans);
+    
+    assertEquals(1, traces.traces.size());
+    assertEquals(0, traces.rejectedSpans.size());
+    
+    // Test debug printing of traces
+    String string1 = traces.traces.get(0).printWithTiming();
+    assertTrue(string1.contains("w"));
+    assertTrue(string1.contains("x"));
+    assertTrue(string1.indexOf("x") != string1.lastIndexOf("x")); // assure two x's
+    
+    String string2 = traces.traces.get(0).printBrief();
+    assertTrue(string2.contains("w"));
+    assertTrue(string2.contains("x"));
+    assertTrue(string2.indexOf("x") != string2.lastIndexOf("x")); // assure two x's
+    
+    // Just for fun, print to console
+    System.out.println(traces.traces.get(0).printWithTiming());
+    System.out.println(traces.traces.get(0).printBrief());
+    
+    server1.close();
+    server2.close();
+    server3.close();
+    aPlugin.httpServer.close();
+    aPlugin.clientFacingServer.stop();
+    bPlugin.httpServer.close();
+    bPlugin.clientFacingServer.stop();
+    cPlugin.httpServer.close();
+    cPlugin.clientFacingServer.stop();
+    dPlugin.httpServer.close();
+    dPlugin.clientFacingServer.stop();
+  }
+  
+  /** Sleeps as requested. */
+  private static class SleepyResponder extends GenericResponder {
+    public SleepyResponder(Protocol local) {
+      super(local);
+    }
+
+    @Override
+    public Object respond(Message message, Object request)
+        throws AvroRemoteException {
+      try {
+        Thread.sleep((Long)((GenericRecord)request).get("millis"));
+      } catch (InterruptedException e) {
+        throw new AvroRemoteException(e);
+      }
+      return null;
+    }
+  }
+}

Added: avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestSpanAggregation.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestSpanAggregation.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestSpanAggregation.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestSpanAggregation.java Thu Aug  5 00:07:24 2010
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.trace;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.apache.avro.ipc.trace.Util.idValue;
+import static org.apache.avro.ipc.trace.Util.idsEqual;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.ipc.trace.SpanAggregator.SpanAggregationResults;
+import org.apache.avro.util.Utf8;
+import org.junit.Test;
+
+import edu.emory.mathcs.backport.java.util.Arrays;
+
+/**
+ * Tests for span aggregation utility methods.
+ */
+public class TestSpanAggregation {
+  /**
+   * Test merging of two basic spans.
+   */
+  @Test
+  public void testSpanCompletion1() {
+    Span span1a = createClientSpan(idValue(1), idValue(1), null, new Utf8("a"));
+    Span span1b = createServerSpan(idValue(1), idValue(1), null, new Utf8("a"));
+    
+    List<Span> partials = new ArrayList<Span>();
+    partials.add(span1a);
+    partials.add(span1b);
+    SpanAggregationResults results = SpanAggregator.getFullSpans(partials);
+    
+    assertNotNull(results.completeSpans);
+    assertNotNull(results.incompleteSpans);
+    assertTrue(results.incompleteSpans.size() == 0); 
+    assertTrue(results.completeSpans.size() == 1);
+    
+    Span result = results.completeSpans.get(0);
+    assertEquals(null, result.parentSpanID);
+    assertTrue(idsEqual(idValue(1), result.spanID));
+    assertEquals(4, result.events.size());
+  }
+  
+  /**
+   * Test span merging with some extra invalid spans;
+   */
+  @Test
+  public void testInvalidSpanCompletion() {
+    // Trace: 1, Span: 1, Parent: null 
+    Span span1a = createClientSpan(idValue(1), idValue(1), null, new Utf8("a"));
+    Span span1b = createServerSpan(idValue(1), idValue(1), null, new Utf8("a"));
+    
+    // Trace: 1, Span: 10, Parent: 3 
+    Span spanBogus1 = createClientSpan(idValue(1), idValue(10), idValue(3), new Utf8("not"));
+    Span spanBogus2 = createServerSpan(idValue(1), idValue(10), idValue(3), new Utf8("equal"));
+    
+    // Trace: 1, Span: 5, Parent: (2/3) 
+    Span spanBogus3 = createClientSpan(idValue(1), idValue(5), idValue(2), new Utf8("equal"));
+    Span spanBogus4 = createServerSpan(idValue(1), idValue(5), idValue(3), new Utf8("equal"));
+    
+    // Trace:1, Span: 4, Parent: 1
+    Span spanBogus5 = createClientSpan(idValue(1), idValue(4), idValue(1), new Utf8("alone"));
+    
+    List<Span> partials = new ArrayList<Span>();
+    partials.add(span1a);
+    partials.add(span1b);
+    partials.add(spanBogus1);
+    partials.add(spanBogus2);
+    partials.add(spanBogus3);
+    partials.add(spanBogus4);
+    partials.add(spanBogus5);
+    
+    SpanAggregationResults results = SpanAggregator.getFullSpans(partials);
+    assertNotNull(results.completeSpans);
+    assertNotNull(results.incompleteSpans);
+
+    assertTrue(results.incompleteSpans.size() == 5);
+    assertTrue(results.incompleteSpans.contains(spanBogus1));
+    assertTrue(results.incompleteSpans.contains(spanBogus2));
+    assertTrue(results.incompleteSpans.contains(spanBogus3));
+    assertTrue(results.incompleteSpans.contains(spanBogus4));
+    assertTrue(results.incompleteSpans.contains(spanBogus5));
+    
+    assertTrue(results.completeSpans.size() == 1);
+    Span result = results.completeSpans.get(0);
+    assertTrue(result.complete);
+    assertTrue(idsEqual(idValue(1), result.spanID));
+    assertEquals(new Utf8("requestorHostname"), result.requestorHostname);
+    assertEquals(new Utf8("responderHostname"), result.responderHostname);
+    assertNull(result.parentSpanID);
+    assertEquals(new Utf8("a"), result.messageName);
+  }
+  
+  /**
+   * Test basic formation of a trace.
+   *      a                      
+   *      b                    
+   *    c   d
+   *        e                    
+   */
+  @Test
+  public void testTraceFormation1() {
+    Span a1 = createClientSpan(idValue(1), idValue(1), null, new Utf8("a"));
+    Span a2 = createServerSpan(idValue(1), idValue(1), null, new Utf8("a"));
+    
+    Span b1 = createClientSpan(idValue(1), idValue(2), idValue(1), new Utf8("b"));
+    Span b2 = createServerSpan(idValue(1), idValue(2), idValue(1), new Utf8("b"));
+
+    Span c1 = createClientSpan(idValue(1), idValue(3), idValue(2), new Utf8("c"));
+    Span c2 = createServerSpan(idValue(1), idValue(3), idValue(2), new Utf8("c"));
+    
+    Span d1 = createClientSpan(idValue(1), idValue(4), idValue(2), new Utf8("d"));
+    Span d2 = createServerSpan(idValue(1), idValue(4), idValue(2), new Utf8("d"));
+    
+    Span e1 = createClientSpan(idValue(1), idValue(5), idValue(4), new Utf8("e"));
+    Span e2 = createServerSpan(idValue(1), idValue(5), idValue(4), new Utf8("e"));
+    
+    List<Span> spans = new LinkedList<Span>();
+    spans.addAll(Arrays.asList(new Span[] {a1, a2, b1, b2, c1, c2, d1, d2, e1, e2}));
+    
+    List<Span> merged = SpanAggregator.getFullSpans(spans).completeSpans;
+    
+    assertEquals(5, merged.size());
+    for (Span s: merged) {
+      assertEquals(new Utf8("requestorHostname"), s.requestorHostname);
+      assertEquals(new Utf8("responderHostname"), s.responderHostname);
+    }
+    
+    List<Trace> traces = SpanAggregator.getTraces(merged).traces;
+    assertEquals(1, traces.size());
+    
+    assertEquals("Trace: (a (b (c) (d (e))))", traces.get(0).printBrief());
+    
+  }
+  
+  /**
+   * Make a mock Span including client-side timing data.
+   */
+  public Span createClientSpan(ID traceID, ID spanID, ID parentID, Utf8 msgName) {
+    Span out = new Span();
+    out.spanID = spanID;
+    out.traceID = traceID;
+    out.requestorHostname = new Utf8("requestorHostname");
+    
+    if (parentID != null) {
+      out.parentSpanID = parentID;
+    }
+    out.messageName = msgName;
+    out.complete = false;
+    
+    TimestampedEvent event1 = new TimestampedEvent();
+    event1.event = SpanEvent.CLIENT_SEND;
+    event1.timeStamp = System.currentTimeMillis() * 1000000;
+    
+    TimestampedEvent event2 = new TimestampedEvent();
+    event2.event = SpanEvent.CLIENT_RECV;
+    event2.timeStamp = System.currentTimeMillis() * 1000000;
+    
+    out.events = new GenericData.Array(
+        2, Schema.createArray(TimestampedEvent.SCHEMA$));
+    out.events.add(event1);
+    out.events.add(event2);
+    
+    return out;
+  }
+  
+  /**
+   * Make a mock Span including server-side timing data.
+   */
+  public Span createServerSpan(ID traceID, ID spanID, ID parentID, Utf8 msgName) {
+    Span out = new Span();
+    out.spanID = spanID;
+    out.traceID = traceID;
+    out.responderHostname = new Utf8("responderHostname");
+    
+    if (parentID != null) {
+      out.parentSpanID = parentID;
+    }
+    out.messageName = msgName;
+    out.complete = false;
+    
+    TimestampedEvent event1 = new TimestampedEvent();
+    event1.event = SpanEvent.SERVER_RECV;
+    event1.timeStamp = System.currentTimeMillis();
+    
+    TimestampedEvent event2 = new TimestampedEvent();
+    event2.event = SpanEvent.SERVER_SEND;
+    event2.timeStamp = System.currentTimeMillis();
+    
+    out.events = new GenericData.Array(
+        2, Schema.createArray(TimestampedEvent.SCHEMA$));
+    out.events.add(event1);
+    out.events.add(event2);
+    
+    return out;
+  }
+}