You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ph...@apache.org on 2010/08/05 02:07:25 UTC
svn commit: r982434 [1/2] - in /avro/trunk: ./
lang/java/src/java/org/apache/avro/ipc/
lang/java/src/java/org/apache/avro/ipc/trace/
lang/java/src/test/java/org/apache/avro/ipc/trace/
share/schemas/org/apache/avro/ipc/trace/
Author: philz
Date: Thu Aug 5 00:07:24 2010
New Revision: 982434
URL: http://svn.apache.org/viewvc?rev=982434&view=rev
Log:
AVRO-595. Add Basic Trace Collection and Propagation.
(Patrick Wendell via philz)
Added:
avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/
avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanAggregator.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Trace.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceClientServlet.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceNode.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java
avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/
avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestBasicTracing.java
avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java
avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestSpanAggregation.java
avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestSpanTraceFormation.java
avro/trunk/share/schemas/org/apache/avro/ipc/trace/
avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avdl
avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avpr
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=982434&r1=982433&r2=982434&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu Aug 5 00:07:24 2010
@@ -13,6 +13,9 @@ Avro 1.4.0 (unreleased)
NEW FEATURES
+ AVRO-595. Add Basic Trace Collection and Propagation.
+ (Patrick Wendell via philz)
+
AVRO-493. Add support for Hadoop Mapreduce with Avro data files. (cutting)
AVRO-285: Specify one-way messages and implement in Java. (cutting)
Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java?rev=982434&r1=982433&r2=982434&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java Thu Aug 5 00:07:24 2010
@@ -102,15 +102,15 @@ public abstract class Requestor {
writeRequest(m.getRequest(), request, out); // write request payload
List<ByteBuffer> payload = bbo.getBufferList();
+ writeHandshake(out); // prepend handshake if needed
+ META_WRITER.write(context.requestCallMeta(), out);
+ out.writeString(m.getName()); // write message name
+
context.setRequestPayload(payload);
for (RPCPlugin plugin : rpcMetaPlugins) {
plugin.clientSendRequest(context); // get meta-data from plugins
}
- writeHandshake(out); // prepend handshake if needed
- META_WRITER.write(context.requestCallMeta(), out);
- out.writeString(m.getName()); // write message name
-
bbo.append(payload);
List<ByteBuffer> requestBytes = bbo.getBufferList();
Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java Thu Aug 5 00:07:24 2010
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc.trace;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Example implementation of SpanStorage which keeps spans in memory.
+ *
+ * This is designed as a prototype for demonstration and testing. It should
+ * not be used in production settings.
+ *
+ */
+public class InMemorySpanStorage implements SpanStorage {
+ private static final long DEFAULT_MAX_SPANS = 10000;
+
+ protected LinkedList<Span> spans;
+ private long maxSpans;
+
+ public InMemorySpanStorage() {
+ this.spans = new LinkedList<Span>();
+ this.maxSpans = DEFAULT_MAX_SPANS;
+ }
+
+ @Override
+ public void addSpan(Span s) {
+ synchronized (this.spans) {
+ this.spans.add(s);
+ if (this.spans.size() > this.maxSpans) {
+ this.spans.removeFirst();
+ }
+ }
+ }
+
+ @Override
+ public void setMaxSpans(long maxSpans) {
+ this.maxSpans = maxSpans;
+
+ synchronized (this.spans) {
+ while (this.spans.size() > maxSpans) {
+ this.spans.removeFirst();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<Span> getAllSpans() {
+ return (LinkedList<Span>) this.spans.clone();
+ }
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanAggregator.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanAggregator.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanAggregator.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanAggregator.java Thu Aug 5 00:07:24 2010
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.trace;
+
+import static org.apache.avro.ipc.trace.Util.idsEqual;
+import static org.apache.avro.ipc.trace.Util.longValue;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.emory.mathcs.backport.java.util.Arrays;
+
+/**
+ * Utility methods for aggregating spans together at various
+ * points of trace analysis.
+ */
+public class SpanAggregator {
+ /**
+ * Class to store the results of span aggregation.
+ */
+ public static class SpanAggregationResults {
+ /** Spans which have data from client and server. */
+ public List<Span> completeSpans;
+
+ /** Spans which have data only from client or server, or in which
+ * an ID collision was detected.*/
+ public List<Span> incompleteSpans;
+
+ public SpanAggregationResults() {
+ completeSpans = new LinkedList<Span>();
+ incompleteSpans = new LinkedList<Span>();
+ }
+ }
+
+ /**
+ * Class to store the results of trace formation.
+ */
+ public static class TraceFormationResults {
+ /** Traces which were successfully created. */
+ public List<Trace> traces;
+
+ /** Spans which did not describe a complete trace. */
+ public List<Span> rejectedSpans;
+
+ public TraceFormationResults() {
+ traces = new LinkedList<Trace>();
+ rejectedSpans = new LinkedList<Span>();
+ }
+ }
+
+ /**
+ * Merge a list of incomplete spans (data filled in from only client or
+ * server) into complete spans (full client and server data).
+ */
+ @SuppressWarnings("unchecked")
+ static SpanAggregationResults getFullSpans(List<Span> partials) {
+ SpanAggregationResults out = new SpanAggregationResults();
+ HashMap<Long, Span> seenSpans = new HashMap<Long, Span>();
+ List<SpanEvent> allEvents = (List<SpanEvent>) Arrays.asList(
+ SpanEvent.values());
+
+ for (Span s: partials) {
+ EnumSet<SpanEvent> foundEvents = Util.getAllEvents(s);
+
+ // Span has complete data already
+ if (foundEvents.containsAll(allEvents)) {
+ out.completeSpans.add(s);
+ }
+ // We haven't seen other half yet
+ else if (!seenSpans.containsKey(
+ Util.longValue(s.spanID))) {
+ seenSpans.put(Util.longValue(s.spanID), s);
+ }
+ // We have seen other half
+ else {
+ Span other = seenSpans.remove(Util.longValue(s.spanID));
+ if (!other.messageName.equals(s.messageName) ||
+ !idsEqual(other.parentSpanID, s.parentSpanID)) {
+ out.incompleteSpans.add(s);
+ out.incompleteSpans.add(other);
+ } else {
+ foundEvents.addAll(Util.getAllEvents(other));
+ if (other.requestorHostname != null) {
+ s.requestorHostname = other.requestorHostname;
+ }
+ if (other.responderHostname != null) {
+ s.responderHostname = other.responderHostname;
+ }
+
+ // We have a complete span between the two
+ if (foundEvents.containsAll(allEvents)) {
+ for (TimestampedEvent event: other.events) {
+ s.events.add(event);
+ }
+ }
+ s.complete = true;
+ out.completeSpans.add(s);
+ }
+ }
+ }
+
+ // Flush unmatched spans
+ for (Span s: seenSpans.values()) {
+ out.incompleteSpans.add(s);
+ }
+ return out;
+ }
+
+ /**
+ * Given a list of Spans extract as many Trace objects as possible.
+ * A {@link Trace} is a tree-like data structure containing spans.
+ */
+ static TraceFormationResults getTraces(List<Span> spans) {
+ /** Traces indexed by traceID. */
+ HashMap<Long, List<Span>> traces = new HashMap<Long, List<Span>>();
+
+ for (Span s: spans) {
+ if (traces.get(longValue(s.traceID)) == null) {
+ traces.put(longValue(s.traceID), new ArrayList<Span>());
+ }
+ traces.get(longValue(s.traceID)).add(s);
+ }
+
+ TraceFormationResults out = new TraceFormationResults();
+
+ for (List<Span> spanSet : traces.values()) {
+ Trace trace = Trace.extractTrace(spanSet);
+ if (trace != null) {
+ out.traces.add(trace);
+ }
+ else {
+ out.rejectedSpans.addAll(spanSet);
+ }
+ }
+ return out;
+ }
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java Thu Aug 5 00:07:24 2010
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc.trace;
+
+import java.util.List;
+
+/**
+ * Responsible for storing spans locally and answering span queries.
+ *
+ * Since query for a given set of spans may persist over several RPC
+ * calls, they are indexed by a handle.
+ *
+ */
+public interface SpanStorage {
+ /**
+ * Add a span.
+ * @param s
+ */
+ void addSpan(Span s);
+
+ /**
+ * Set the maximum number of spans to have in storage at any given time.
+ */
+ void setMaxSpans(long maxSpans);
+
+ /**
+ * Return a list of all spans currently stored. For testing.
+ */
+ List<Span> getAllSpans();
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Trace.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Trace.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Trace.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Trace.java Thu Aug 5 00:07:24 2010
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.trace;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Collections;
+
+/**
+ * A Trace is a tree of spans which reflects the actual call structure of a
+ * recursive RPC call tree. Each node in a Trace represents a RPC
+ * request/response pair. Each node also has zero or more child nodes.
+ */
+public class Trace {
+ private TraceNode root;
+
+ /**
+ * Construct a trace given a root TraceNode.
+ */
+ public Trace(TraceNode root) {
+ this.root = root;
+ }
+
+ /**
+ * Return the root node of this trace.
+ */
+ public TraceNode getRoot() {
+ return this.root;
+ }
+
+ /**
+ * Provide a hashCode unique to the execution path of this trace.
+ *
+ * This is useful for grouping several traces which represent the same
+ * execution path (for instance, when we want to calculate averages for a
+ * large number of identical traces).
+ */
+ public int executionPathHash() {
+ // The string representation will be unique to a call tree, so we
+ // can borrow the hashCode from that string.
+ return this.printBrief().hashCode();
+ }
+
+ private class NodeComparator implements Comparator<TraceNode> {
+ @Override
+ public int compare(TraceNode tn0, TraceNode tn1) {
+ // We sort nodes alphabetically by the message name
+ int result = tn0.span.messageName.compareTo(tn1.span.messageName);
+
+ if (result != 0) {
+ return result;
+ }
+ /* NOTE:
+ * If two spans containing the *same* RPC message share a parent, we need
+ * a way to consistently order them. Here, we use the send time to
+ * break ties. This will only work deterministically for non-async
+ * clients. For asynchronous clients, aggregated statistics based on this
+ * ordering may be incorrect, since we have no way to disambiguate one
+ * function call from another.
+ */
+ else {
+ Long tn0SendTime = tn0.extractEventTime(tn0, SpanEvent.CLIENT_SEND);
+ Long tn1SendTime = tn1.extractEventTime(tn1, SpanEvent.CLIENT_SEND);
+
+ return tn0SendTime.compareTo(tn1SendTime);
+ }
+ }
+ }
+
+ /**
+ * Print a brief description of this trace describing the execution
+ * path, but not timing data. This is for debugging or quickly profiling
+ * traces.
+ *
+ * For instance the trace:
+ * x
+ * /
+ * w
+ * \
+ * y--z
+ *
+ * is encoded as:
+ * (w (x) (y (z)))
+ */
+ public String printBrief() {
+ if (this.root == null) { return "Trace: <empty>"; }
+ String out = "Trace: (";
+ out += this.root.span.messageName + " ";
+ out += printBriefRecurse(root.children);
+ out += ")";
+ return out;
+ }
+
+ private String printBriefRecurse(List<TraceNode> children) {
+ String out = "";
+ // We sort so equivalent traces always print identically
+ Collections.sort(children, new NodeComparator());
+ for (int i = 0; i < children.size(); i++) {
+ TraceNode tn = children.get(i);
+ out += "(" + tn.span.messageName;
+ if (tn.children.size() > 0) {
+ out += " ";
+ out += printBriefRecurse(tn.children);
+ }
+ out += ")";
+ if (i != children.size() - 1) {
+ out += " ";
+ }
+ }
+ return out;
+ }
+
+ /**
+ * Print a description of this trace which includes timing data. This is for
+ * debugging or quickly profiling traces.
+ *
+ * For instance the trace:
+ * x
+ * /
+ * w
+ * \
+ * x
+ *
+ * Might print as:
+ * w 87ms
+ * x 10ms
+ * x 2ms
+ */
+ public String printWithTiming() {
+ if (this.root == null) { return "Trace: <empty>"; }
+ String out = "Trace: " + "\n";
+ List<TraceNode> rootList = new LinkedList<TraceNode>();
+ rootList.add(this.root);
+ out += printWithTimingRecurse(rootList, 0);
+ return out;
+ }
+
+ private String printWithTimingRecurse(List<TraceNode> children, int depth) {
+ String out = "";
+ // We sort so equivalent traces always print identically
+ Collections.sort(children, new NodeComparator());
+ for (TraceNode tn : children) {
+ long clientSend = 0;
+ long clientReceive = 0;
+ for (TimestampedEvent te: tn.span.events) {
+ if (te.event instanceof SpanEvent) {
+ SpanEvent ev = (SpanEvent) te.event;
+ if (ev.equals(SpanEvent.CLIENT_RECV)) {
+ clientReceive = te.timeStamp / 1000000;
+ } else if (ev.equals(SpanEvent.CLIENT_SEND)) {
+ clientSend = te.timeStamp / 1000000;
+ }
+ }
+ }
+
+ for (int i = 0; i < depth; i++) { out = out + " "; } // indent
+ out += tn.span.messageName + " " + (clientReceive - clientSend) + "ms\n";
+ if (tn.children.size() > 0) {
+ out += printWithTimingRecurse(tn.children, depth + 1);
+ }
+ }
+
+ return out;
+ }
+
+ /**
+ * Construct a Trace from a list of Span objects. If no such trace
+ * can be created (if the list does not describe a complete trace)
+ * returns null.
+ */
+ public static Trace extractTrace(List<Span> spans) {
+ /**
+ * Map of span id's to a list of child span id's
+ */
+ HashMap<Long, List<Long>> children = new HashMap<Long, List<Long>>();
+
+ /**
+ * Map of span id's to spans
+ */
+ HashMap<Long, Span> spanRef = new HashMap<Long, Span>();
+
+ /**
+ * Root span
+ */
+ Span rootSpan = null;
+
+ for (Span s: spans) {
+ spanRef.put(Util.longValue(s.spanID), s);
+ if (s.parentSpanID == null) {
+ rootSpan = s;
+ } else {
+ if (children.get(Util.longValue(s.parentSpanID)) == null) {
+ LinkedList<Long> list = new LinkedList<Long>();
+ list.add(Util.longValue(s.spanID));
+ children.put(Util.longValue(s.parentSpanID), list);
+ } else {
+ children.get(Util.longValue(s.parentSpanID)).add(
+ Util.longValue(s.spanID));
+ }
+ }
+ }
+ if (rootSpan == null) { // We never found a root
+ return null;
+ }
+ TraceNode rootNode = getNode(rootSpan, spanRef, children);
+ return new Trace(rootNode);
+ }
+
+ /**
+ * Recursive helper method to create a span tree.
+ */
+ private static TraceNode getNode(
+ Span s, HashMap<Long, Span> spanRef, HashMap<Long, List<Long>> children) {
+ TraceNode out = new TraceNode();
+ out.span = s;
+ out.children = new LinkedList<TraceNode>();
+
+ List<Long> kids = children.get(Util.longValue(s.spanID));
+ if (kids == null) { return out; } // no children (base case)
+
+ for (long childID: kids) {
+ Span childSpan = spanRef.get(childID);
+ if (childSpan == null) { return null; } // invalid span reference
+ out.children.add(getNode(childSpan, spanRef, children));
+ }
+ return out;
+ }
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceClientServlet.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceClientServlet.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceClientServlet.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceClientServlet.java Thu Aug 5 00:07:24 2010
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.trace;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+public class TraceClientServlet extends HttpServlet {
+ public void doPost(HttpServletRequest request,
+ HttpServletResponse response)
+ throws ServletException, IOException {
+ response.setContentType("text/html");
+ PrintWriter out = response.getWriter();
+
+ out.println("<title>Example</title> <body bgcolor=FFFFFF>");
+ out.println("<h2>Button Clicked</h2>");
+
+ String servers = request.getParameter("servers");
+
+ if(servers != null){
+ String splitToken = System.getProperty("line.separator");
+ if (splitToken == null) {
+ splitToken = "\n";
+ }
+ String[] parts = servers.split(splitToken);
+ for (String p : parts) {
+ out.println(p + "<br>");
+ }
+ } else {
+ out.println("No text entered.");
+ }
+ }
+
+ public void doGet(HttpServletRequest request,
+ HttpServletResponse response) throws IOException {
+ response.setContentType("text/html");
+ PrintWriter out = response.getWriter();
+
+ out.println("<html>");
+ out.println("<head>Form</head>");
+ out.println("<body><form method='post'>");
+ out.println("<textarea name='servers'></textarea>");
+ out.println("<input type='submit' name='submit'>");
+ out.println("</form>");
+ out.println("</html>");
+ }
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceNode.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceNode.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceNode.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceNode.java Thu Aug 5 00:07:24 2010
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.trace;
+
+import java.util.List;
+
+/**
+ * A node of of an RPC {@link Trace}. Each node stores a {@link Span} object
+ * and a list of zero or more child nodes.
+ */
+class TraceNode {
+ /**
+ * The {@link Span} to which corresponds to this node in the call tree.
+ */
+ public Span span;
+
+ /**
+ * A list of this TraceNode's children.
+ */
+ public List<TraceNode> children;
+
+ public TraceNode(Span span, List<TraceNode> children) {
+ this.span = span;
+ this.children = children;
+ }
+
+ public TraceNode() {
+
+ }
+
+ /**
+ * Return the time stamp associated with a particular SpanEvent in this
+ * Trace Node. Return -1 if the TraceNode's Span did not contain that event.
+ */
+ public long extractEventTime(TraceNode tn, SpanEvent e) {
+ for (TimestampedEvent te: tn.span.events) {
+ if ((te.event instanceof SpanEvent) &&
+ (SpanEvent) te.event == e) {
+ return te.timeStamp;
+ }
+ }
+ return -1;
+ }
+
+ /**
+ * Return time delta between { @link SpanEvent.CLIENT_SEND } and
+ * { @link SpanEvent.SERVER_RECV }. This may be negative or zero in the
+ * case of clock skew.
+ */
+ public long getPreNetworkTime() {
+ long clientSend = extractEventTime(this, SpanEvent.CLIENT_SEND);
+ long serverReceive = extractEventTime(this, SpanEvent.SERVER_RECV);
+
+ return serverReceive - clientSend;
+ }
+
+ /**
+ * Return time delta between { @link SpanEvent.SERVER_SEND } and
+ * { @link SpanEvent.CLIENT_RECV }. This may be negative or zero in the
+ * case of clock skew.
+ */
+ public long getPostNetworkTime() {
+ long serverSend = extractEventTime(this, SpanEvent.SERVER_SEND);
+ long clientReceive = extractEventTime(this, SpanEvent.CLIENT_RECV);
+
+ return clientReceive - serverSend;
+ }
+
+ /**
+ * Return time delta between { @link SpanEvent.SERVER_RECV } and
+ * { @link SpanEvent.SERVER_SEND}.
+ */
+ public long getProcessTime() {
+ long serverReceive = extractEventTime(this, SpanEvent.SERVER_RECV);
+ long serverSend = extractEventTime(this, SpanEvent.SERVER_SEND);
+
+ return serverSend - serverReceive;
+ }
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java Thu Aug 5 00:07:24 2010
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc.trace;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.RPCContext;
+import org.apache.avro.ipc.RPCPlugin;
+import org.apache.avro.specific.SpecificResponder;
+import org.apache.avro.util.Utf8;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.bio.SocketConnector;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A tracing plugin for Avro.
+ *
+ * This plugin traces RPC call timing and follows nested trees of RPC
+ * calls. To use, instantiate and add to an existing Requestor or
+ * Responder. If you have a Responder that itself acts as an RPC client (i.e.
+ * it contains a Requestor) be sure to pass the same instance of the
+ * plugin to both components so that propagation occurs correctly.
+ *
+ * Currently, propagation only works if each requester is performing
+ * serial RPC calls. That is, we cannot support the case in which the
+ * Requestor's request does not occur in the same thread as the Responder's
+ * response.
+ */
+public class TracePlugin extends RPCPlugin {
+ final private static Random RANDOM = new Random();
+ private static final Logger LOG = LoggerFactory.getLogger(TracePlugin.class);
+ public static enum StorageType { MEMORY, DISK };
+
+ /*
+ * This plugin uses three key/value meta-data pairs. The value type for all
+ * of these pairs is fixed(8) and they are assumed to be encoded as 8-byte
+ * ID's. The presence of a TRACE_ID_KEY and a SPAN_ID_KEY in a message
+ * signals that tracing is in progress. The optional PARENT_SPAN_ID_KEY
+ * signals that this message has a parent node in the RPC call tree.
+ */
+ private static final Utf8 TRACE_ID_KEY = new Utf8("traceID");
+ private static final Utf8 SPAN_ID_KEY = new Utf8("spanID");
+ private static final Utf8 PARENT_SPAN_ID_KEY = new Utf8("parentSpanID");
+
+
+ class TraceResponder implements AvroTrace {
+ private SpanStorage spanStorage;
+
+ public TraceResponder(SpanStorage spanStorage) {
+ this.spanStorage = spanStorage;
+ }
+
+ @Override
+ public GenericArray<Span> getAllSpans() throws AvroRemoteException {
+ List<Span> spans = this.spanStorage.getAllSpans();
+ GenericData.Array<Span> out;
+ synchronized (spans) {
+ out = new GenericData.Array<Span>(spans.size(),
+ Schema.createArray(Span.SCHEMA$));
+ for (Span s: spans) {
+ out.add(s);
+ }
+ }
+ return out;
+ }
+ }
+
+ private double traceProb; // Probability of starting tracing
+ private int port; // Port to serve tracing data
+ private int clientPort; // Port to expose client HTTP interface
+ private StorageType storageType; // How to store spans
+ private long maxSpans; // Max number of spans to store
+ private boolean enabled; // Whether to participate in tracing
+
+ private ThreadLocal<Span> currentSpan; // span in which we are server
+ private ThreadLocal<Span> childSpan; // span in which we are client
+
+ // Storage and serving of spans
+ protected SpanStorage storage;
+ protected HttpServer httpServer;
+ protected SpecificResponder responder;
+
+ // Client interface
+ protected Server clientFacingServer;
+
+ public TracePlugin(TracePluginConfiguration conf) throws IOException {
+ traceProb = conf.traceProb;
+ port = conf.port;
+ clientPort = conf.clientPort;
+ storageType = conf.storageType;
+ maxSpans = conf.maxSpans;
+ enabled = conf.enabled;
+
+ // check bounds
+ if (!(traceProb >= 0.0 && traceProb <= 1.0)) { traceProb = 0.0; }
+ if (!(port > 0 && port < 65535)) { port = 51001; }
+ if (!(clientPort > 0 && clientPort < 65535)) { clientPort = 51200; }
+ if (maxSpans < 0) { maxSpans = 5000; }
+
+ currentSpan = new ThreadLocal<Span>(){
+ @Override protected Span initialValue(){
+ return null;
+ }
+ };
+
+ childSpan = new ThreadLocal<Span>(){
+ @Override protected Span initialValue(){
+ return null;
+ }
+ };
+
+ if (storageType.equals("MEMORY")) {
+ this.storage = new InMemorySpanStorage();
+ }
+ else { // default
+ this.storage = new InMemorySpanStorage();
+ }
+
+ this.storage.setMaxSpans(maxSpans);
+
+ // Start serving span data
+ responder = new SpecificResponder(
+ AvroTrace.PROTOCOL, new TraceResponder(this.storage));
+
+ boolean bound = false;
+
+ while (!bound) {
+ // rather than die if port is taken, try to fail over to another port.
+ try {
+ httpServer = new HttpServer(responder, this.port);
+ bound = true;
+ } catch (AvroRuntimeException e) {
+ if (e.getCause() instanceof BindException) {
+ LOG.error("Failed to bind to port: " + this.port);
+ this.port = this.port +1;
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ // Start client-facing servlet
+ initializeClientServer();
+ }
+
+ @Override
+ public void clientStartConnect(RPCContext context) {
+ // There are two cases in which we will need to seed a trace
+ // (1) If we probabilistically decide to seed a new trace
+ // (2) If we are part of an existing trace
+
+ if ((this.currentSpan.get() == null) &&
+ (RANDOM.nextFloat() < this.traceProb) && enabled) {
+ // Start new trace
+ Span span = Util.createEventlessSpan(null, null, null);
+ this.childSpan.set(span);
+ }
+
+ if ((this.currentSpan.get() != null) && enabled) {
+ Span currSpan = this.currentSpan.get();
+ Span span = Util.createEventlessSpan(
+ currSpan.traceID, null, currSpan.spanID);
+ this.childSpan.set(span);
+ }
+
+ if (this.childSpan.get() != null) {
+ Span span = this.childSpan.get();
+ context.requestHandshakeMeta().put(
+ TRACE_ID_KEY, ByteBuffer.wrap(span.traceID.bytes()));
+ context.requestHandshakeMeta().put(
+ SPAN_ID_KEY, ByteBuffer.wrap(span.spanID.bytes()));
+ if (span.parentSpanID != null) {
+ context.requestHandshakeMeta().put(
+ PARENT_SPAN_ID_KEY, ByteBuffer.wrap(span.parentSpanID.bytes()));
+ }
+ }
+ }
+
+ @Override
+ public void serverConnecting(RPCContext context) {
+ Map<Utf8, ByteBuffer> meta = context.requestHandshakeMeta();
+ // Are we being asked to propagate a trace?
+ if (meta.containsKey(TRACE_ID_KEY) && enabled) {
+ if (!(meta.containsKey(SPAN_ID_KEY))) {
+ LOG.warn("Span ID missing for trace " +
+ meta.get(TRACE_ID_KEY).toString());
+ return; // should have been given full span data
+ }
+ byte[] spanIDBytes = new byte[8];
+ meta.get(SPAN_ID_KEY).get(spanIDBytes);
+ ID spanID = new ID();
+ spanID.bytes(spanIDBytes);
+
+ ID parentSpanID = null;
+ if (meta.get(PARENT_SPAN_ID_KEY) != null) {
+ parentSpanID = new ID();
+ parentSpanID.bytes(meta.get(PARENT_SPAN_ID_KEY).array());
+ }
+ ID traceID = new ID();
+ traceID.bytes(meta.get(TRACE_ID_KEY).array());
+
+ Span span = Util.createEventlessSpan(traceID, spanID, parentSpanID);
+
+ span.events = new GenericData.Array<TimestampedEvent>(
+ 100, Schema.createArray(TimestampedEvent.SCHEMA$));
+ this.currentSpan.set(span);
+ }
+ }
+
+ @Override
+ public void clientFinishConnect(RPCContext context) { }
+
+ @Override
+ public void clientSendRequest(RPCContext context) {
+ if (this.childSpan.get() != null) {
+ Span child = this.childSpan.get();
+ Util.addEvent(child, SpanEvent.CLIENT_SEND);
+ child.messageName = new Utf8(
+ context.getMessage().getName());
+ child.requestPayloadSize = Util.getPayloadSize(
+ context.getRequestPayload());
+ }
+ }
+
+ @Override
+ public void serverReceiveRequest(RPCContext context) {
+ if (this.currentSpan.get() != null) {
+ Span current = this.currentSpan.get();
+ Util.addEvent(current, SpanEvent.SERVER_RECV);
+ current.messageName = new Utf8(
+ context.getMessage().getName());
+ current.requestPayloadSize = Util.getPayloadSize(
+ context.getRequestPayload());
+ }
+ }
+
+ @Override
+ public void serverSendResponse(RPCContext context) {
+ if (this.currentSpan.get() != null) {
+ Span current = this.currentSpan.get();
+ Util.addEvent(current, SpanEvent.SERVER_SEND);
+ current.responsePayloadSize =
+ Util.getPayloadSize(context.getResponsePayload());
+ this.storage.addSpan(this.currentSpan.get());
+ this.currentSpan.set(null);
+ }
+ }
+
+ @Override
+ public void clientReceiveResponse(RPCContext context) {
+ if (this.childSpan.get() != null) {
+ Span child = this.childSpan.get();
+ Util.addEvent(child, SpanEvent.CLIENT_RECV);
+ child.responsePayloadSize =
+ Util.getPayloadSize(context.getResponsePayload());
+ this.storage.addSpan(this.childSpan.get());
+ this.childSpan.set(null);
+ }
+ }
+
+ /**
+ * Start a client-facing server. Can be overridden if users
+ * prefer to attach client Servlet to their own server.
+ */
+ protected void initializeClientServer() {
+ clientFacingServer = new Server();
+ Context context = new Context(clientFacingServer, "/");
+ context.addServlet(new ServletHolder(new TraceClientServlet()), "/");
+ boolean connected = false;
+ SocketConnector socket = null;
+
+ // Keep trying ports until we can connect
+ while (!connected) {
+ try {
+ socket = new SocketConnector();
+ socket.setPort(clientPort);
+ clientFacingServer.addConnector(socket);
+ clientFacingServer.start();
+ connected = true;
+ } catch (Exception e) {
+ if (e instanceof BindException) {
+ clientFacingServer.removeConnector(socket);
+ clientPort = clientPort + 1;
+ continue;
+ }
+ else {
+ break; // Fail silently here (this is currently unused)
+ }
+ }
+ }
+ }
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java Thu Aug 5 00:07:24 2010
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.trace;
+
+import org.apache.avro.ipc.trace.TracePlugin.StorageType;
+
+/**
+ * Helper class for configuring Avro's {@link TracePlugin}. If you are using
+ * a common configuration module, wrap this class with your own configuration.
+ */
+public class TracePluginConfiguration {
+ public double traceProb; // Probability of starting tracing
+ public int port; // Port to serve tracing data
+ public int clientPort; // Port to expose client HTTP interface
+ public StorageType storageType; // How to store spans
+ public long maxSpans; // Max number of spans to store
+ public boolean enabled; // Whether or not we are active
+
+ /**
+ * Return a TracePluginConfiguration with default options.
+ */
+ public TracePluginConfiguration() {
+ this.traceProb = 0.0;
+ this.port = 12335;
+ this.clientPort = 12345;
+ this.storageType = StorageType.MEMORY;
+ this.maxSpans = 10000;
+ this.enabled = true;
+ }
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java Thu Aug 5 00:07:24 2010
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc.trace;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.util.Utf8;
+
+/**
+ * Utility methods for common tasks in Avro tracing. Mostly consists of
+ * static methods which we can't put in auto-generated classes.
+ */
+class Util {
+ final private static Random RANDOM = new Random();
+ final private static int NANOS_PER_MILI = 1000000;
+ private static Utf8 hostname;
+
+ /**
+ * Get all SpanEvents contained in Span s.
+ */
+ public static EnumSet<SpanEvent> getAllEvents(Span s) {
+ EnumSet<SpanEvent> foundEvents = EnumSet.noneOf(SpanEvent.class);
+ for (TimestampedEvent event: s.events) {
+ if (event.event instanceof SpanEvent) {
+ foundEvents.add((SpanEvent) event.event);
+ }
+ }
+ return foundEvents;
+ }
+
+
+ /**
+ * Get the size of an RPC payload.
+ */
+ public static int getPayloadSize(List<ByteBuffer> payload) {
+ if (payload == null) {
+ return 0;
+ }
+ int size = 0;
+ for (ByteBuffer bb: payload) {
+ size = size + bb.limit();
+ }
+ return size;
+ }
+
+ /**
+ * Create a span without any events. If traceID or spanID is null, randomly
+ * generate them. If parentSpanID is null, assume this is a root span.
+ */
+ public static Span createEventlessSpan(ID traceID, ID spanID, ID parentSpanID) {
+ Span span = new Span();
+ span.complete = false;
+
+ if (traceID == null) {
+ byte[] traceIDBytes = new byte[8];
+ RANDOM.nextBytes(traceIDBytes);
+ span.traceID = new ID();
+ span.traceID.bytes(traceIDBytes);
+ } else {
+ span.traceID = new ID();
+ span.traceID.bytes(traceID.bytes().clone());
+ }
+
+ if (spanID == null) {
+ byte[] spanIDBytes = new byte[8];
+ RANDOM.nextBytes(spanIDBytes);
+ span.spanID = new ID();
+ span.spanID.bytes(spanIDBytes);
+ } else {
+ span.spanID = new ID();
+ span.spanID.bytes(spanID.bytes().clone());
+ }
+
+ if (parentSpanID != null) {
+ span.parentSpanID = new ID();
+ span.parentSpanID.bytes(parentSpanID.bytes().clone());
+ }
+
+ span.events = new GenericData.Array<TimestampedEvent>(
+ 10, Schema.createArray(TimestampedEvent.SCHEMA$));
+
+ if (hostname == null) {
+ try {
+ hostname = new Utf8(InetAddress.getLocalHost().toString());
+ } catch (UnknownHostException e) {
+ hostname = new Utf8("Unknown");
+ }
+ }
+ span.requestorHostname = hostname;
+ return span;
+ }
+
+ /**
+ * Add a TimestampedEvent to a Span using the current time.
+ */
+ public static void addEvent(Span span, SpanEvent eventType) {
+ TimestampedEvent ev = new TimestampedEvent();
+ ev.event = eventType;
+ ev.timeStamp = System.currentTimeMillis() * NANOS_PER_MILI;
+ span.events.add(ev);
+ }
+
+ /**
+ * Get the long value from a given ID object.
+ */
+ public static long longValue(ID in) {
+ if (in == null) {
+ throw new IllegalArgumentException("ID cannot be null");
+ }
+ if (in.bytes() == null) {
+ throw new IllegalArgumentException("ID cannot be empty");
+ }
+ if (in.bytes().length != 8) {
+ throw new IllegalArgumentException("ID must be 8 bytes");
+ }
+ ByteBuffer buff = ByteBuffer.wrap(in.bytes());
+ return buff.getLong();
+ }
+
+ /**
+ * Get an ID associated with a given long value.
+ */
+ public static ID idValue(long in) {
+ byte[] bArray = new byte[8];
+ ByteBuffer bBuffer = ByteBuffer.wrap(bArray);
+ LongBuffer lBuffer = bBuffer.asLongBuffer();
+ lBuffer.put(0, in);
+ ID out = new ID();
+ out.bytes(bArray);
+ return out;
+ }
+
+ /**
+ * Verify the equality of ID objects. Both being null references is
+ * considered equal.
+ */
+ public static boolean idsEqual(ID a, ID b) {
+ if (a == null && b == null) { return true; }
+ if (a == null || b == null) { return false; }
+
+ return Arrays.equals(a.bytes(), b.bytes());
+ }
+
+}
Added: avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestBasicTracing.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestBasicTracing.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestBasicTracing.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestBasicTracing.java Thu Aug 5 00:07:24 2010
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.trace;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.Protocol.Message;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRequestor;
+import org.apache.avro.generic.GenericResponder;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.HttpTransceiver;
+import org.apache.avro.ipc.RPCPlugin;
+import org.apache.avro.ipc.Responder;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
+public class TestBasicTracing {
+ Protocol protocol = Protocol.parse("" + "{\"protocol\": \"Minimal\", "
+ + "\"messages\": { \"m\": {"
+ + " \"request\": [{\"name\": \"x\", \"type\": \"int\"}], "
+ + " \"response\": \"int\"} } }");
+ Message message = protocol.getMessages().get("m");
+
+ /** Expects 0 and returns 1. */
+ static class TestResponder extends GenericResponder {
+ public TestResponder(Protocol local) {
+ super(local);
+ }
+
+ @Override
+ public Object respond(Message message, Object request)
+ throws AvroRemoteException {
+ assertEquals(0, ((GenericRecord) request).get("x"));
+ return 1;
+ }
+ }
+
+ @Test
+ public void testBasicTrace() throws Exception {
+ TracePluginConfiguration conf = new TracePluginConfiguration();
+ conf.port = 51007;
+ conf.clientPort = 12344;
+ conf.traceProb = 1.0;
+ TracePlugin responderPlugin = new TracePlugin(conf);
+ conf.port = 51008;
+ conf.clientPort = 12345;
+ TracePlugin requestorPlugin = new TracePlugin(conf);
+
+ Responder res = new TestResponder(protocol);
+ res.addRPCPlugin(responderPlugin);
+
+ HttpServer server = new HttpServer(res, 50000);
+ server.start();
+
+ HttpTransceiver trans = new HttpTransceiver(
+ new URL("http://localhost:50000"));
+
+ GenericRequestor r = new GenericRequestor(protocol, trans);
+ r.addRPCPlugin(requestorPlugin);
+
+ GenericRecord params = new GenericData.Record(protocol.getMessages().get(
+ "m").getRequest());
+ params.put("x", 0);
+ r.request("m", params);
+
+ List<Span> responderSpans = responderPlugin.storage.getAllSpans();
+ assertEquals(1, responderSpans.size());
+
+ List<Span> requestorSpans = requestorPlugin.storage.getAllSpans();
+ assertEquals(1, requestorSpans.size());
+
+ if ((responderSpans.size() == 1 && requestorSpans.size() == 1)) {
+ Span responderSpan = responderSpans.get(0);
+ Span requestorSpan = requestorSpans.get(0);
+
+ // Check meta propagation
+ assertEquals(null, requestorSpan.parentSpanID);
+ assertEquals(responderSpan.parentSpanID, requestorSpan.parentSpanID);
+ assertEquals(responderSpan.traceID, requestorSpan.traceID);
+
+ // Check other data
+ assertEquals(2, requestorSpan.events.size());
+ assertEquals(2, responderSpan.events.size());
+ assertTrue("m".equals(requestorSpan.messageName.toString()));
+ assertTrue("m".equals(responderSpan.messageName.toString()));
+ assertFalse(requestorSpan.complete);
+ assertFalse(responderSpan.complete);
+ }
+
+ server.close();
+
+ requestorPlugin.clientFacingServer.stop();
+ requestorPlugin.httpServer.close();
+
+ responderPlugin.clientFacingServer.stop();
+ responderPlugin.httpServer.close();
+ }
+
+ /*
+ * Test a more complicated, recursive trace involving four agents and three
+ * spans.
+ *
+ * Messages are x, y, z which request/return
+ * incrementing int values (shown below).
+ *
+ * |-w-(1)-> | |
+ * | |-x-(2)-> | C
+ * | | <-x-(3)-|
+ * | |
+ * A | B |
+ * | | |
+ * | |-x-(4)-> |
+ * | | <-x-(5)-| D
+ * | | |
+ * |<-w-(6)- | |
+ *
+ * Listening ports are B: 21005
+ * C: 21006
+ * D: 21007
+ */
+
+ Protocol advancedProtocol = Protocol.parse("{\"protocol\": \"Advanced\", "
+ + "\"messages\": { "
+ + "\"w\": { \"request\": [{\"name\": \"req\", \"type\": \"int\"}], "
+ + " \"response\": \"int\"},"
+ + "\"x\": { \"request\": [{\"name\": \"req\", \"type\": \"int\"}], "
+ + " \"response\": \"int\"},"
+ + "\"y\": { \"request\": [{\"name\": \"req\", \"type\": \"int\"}], "
+ + " \"response\": \"int\"}"
+ + " } }");
+
+ /** Middle Responder */
+ static class RecursingResponder extends GenericResponder {
+ HttpTransceiver transC;
+ HttpTransceiver transD;
+ GenericRequestor reqC;
+ GenericRequestor reqD;
+ Protocol protocol;
+
+ public RecursingResponder(Protocol local, RPCPlugin plugin)
+ throws Exception {
+ super(local);
+ transC = new HttpTransceiver(
+ new URL("http://localhost:21006"));
+ transD = new HttpTransceiver(
+ new URL("http://localhost:21007"));
+ reqC = new GenericRequestor(local, transC);
+ reqC.addRPCPlugin(plugin);
+ reqD = new GenericRequestor(local, transD);
+ reqD.addRPCPlugin(plugin);
+
+ protocol = local;
+ }
+
+ @Override
+ public Object respond(Message message, Object request)
+ throws IOException {
+ assertTrue("w".equals(message.getName()));
+ GenericRecord inParams = (GenericRecord)request;
+ Integer currentCount = (Integer) inParams.get("req");
+ assertTrue(currentCount.equals(1));
+
+ GenericRecord paramsC = new GenericData.Record(
+ protocol.getMessages().get("x").getRequest());
+ paramsC.put("req", currentCount + 1);
+ Integer returnC = (Integer) reqC.request("x", paramsC);
+ assertTrue(returnC.equals(currentCount + 2));
+
+ GenericRecord paramsD = new GenericData.Record(
+ protocol.getMessages().get("x").getRequest());
+ paramsD.put("req", currentCount + 3);
+ Integer returnD = (Integer) reqD.request("x", paramsD);
+ assertTrue(returnD.equals(currentCount + 4));
+
+ return currentCount + 5;
+ }
+ }
+
+ /** Endpoint responder */
+ static class EndpointResponder extends GenericResponder {
+ public EndpointResponder(Protocol local) {
+ super(local);
+ }
+
+ @Override
+ public Object respond(Message message, Object request)
+ throws AvroRemoteException {
+ GenericRecord inParams = (GenericRecord)request;
+ Integer currentCount = (Integer) inParams.get("req");
+
+ return currentCount + 1;
+ }
+ }
+
+ @Test
+ public void testRecursingTrace() throws Exception {
+ TracePluginConfiguration conf = new TracePluginConfiguration();
+ conf.traceProb = 1.0;
+ conf.port = 51010;
+ conf.clientPort = 12346;
+ TracePlugin aPlugin = new TracePlugin(conf);
+ conf.port = 51011;
+ conf.clientPort = 12347;
+ TracePlugin bPlugin = new TracePlugin(conf);
+ conf.port = 51012;
+ conf.clientPort = 12348;
+ TracePlugin cPlugin = new TracePlugin(conf);
+ conf.port = 51013;
+ conf.clientPort = 12349;
+ TracePlugin dPlugin = new TracePlugin(conf);
+
+ // Responders
+ Responder bRes = new RecursingResponder(advancedProtocol, bPlugin);
+ bRes.addRPCPlugin(bPlugin);
+ HttpServer server1 = new HttpServer(bRes, 21005);
+ server1.start();
+
+ Responder cRes = new EndpointResponder(advancedProtocol);
+ cRes.addRPCPlugin(cPlugin);
+ HttpServer server2 = new HttpServer(cRes, 21006);
+ server2.start();
+
+ Responder dRes = new EndpointResponder(advancedProtocol);
+ dRes.addRPCPlugin(dPlugin);
+ HttpServer server3 = new HttpServer(dRes, 21007);
+ server3.start();
+
+ // Root requestor
+ HttpTransceiver trans = new HttpTransceiver(
+ new URL("http://localhost:21005"));
+
+ GenericRequestor r = new GenericRequestor(advancedProtocol, trans);
+ r.addRPCPlugin(aPlugin);
+
+ GenericRecord params = new GenericData.Record(
+ advancedProtocol.getMessages().get("w").getRequest());
+ params.put("req", 1);
+ r.request("w", params);
+
+ // Verify counts
+ assertEquals(1, aPlugin.storage.getAllSpans().size());
+ assertEquals(3, bPlugin.storage.getAllSpans().size());
+ assertEquals(1, cPlugin.storage.getAllSpans().size());
+ assertEquals(1, dPlugin.storage.getAllSpans().size());
+
+ ID traceID = aPlugin.storage.getAllSpans().get(0).traceID;
+ ID rootSpanID = null;
+
+ // Verify event counts and trace ID propagation
+ for (Span s: aPlugin.storage.getAllSpans()) {
+ assertEquals(2, s.events.size());
+ assertTrue(Util.idsEqual(traceID, s.traceID));
+ assertFalse(s.complete);
+ rootSpanID = s.spanID;
+ }
+
+ for (Span s: bPlugin.storage.getAllSpans()) {
+ assertEquals(2, s.events.size());
+ assertEquals(traceID, s.traceID);
+ assertFalse(s.complete);
+ }
+
+ for (Span s: cPlugin.storage.getAllSpans()) {
+ assertEquals(2, s.events.size());
+ assertEquals(traceID, s.traceID);
+ assertFalse(s.complete);
+ }
+ for (Span s: dPlugin.storage.getAllSpans()) {
+ assertEquals(2, s.events.size());
+ assertEquals(traceID, s.traceID);
+ assertFalse(s.complete);
+ }
+
+ // Verify span propagation.
+ ID firstSpanID = aPlugin.storage.getAllSpans().get(0).spanID;
+ ID secondSpanID = cPlugin.storage.getAllSpans().get(0).spanID;
+ ID thirdSpanID = dPlugin.storage.getAllSpans().get(0).spanID;
+
+ boolean firstFound = false, secondFound = false, thirdFound = false;
+ for (Span s: bPlugin.storage.getAllSpans()) {
+ if (Util.idsEqual(s.spanID, firstSpanID)) {
+ firstFound = true;
+ }
+ else if (Util.idsEqual(s.spanID, secondSpanID)) {
+ secondFound = true;
+ }
+ else if (Util.idsEqual(s.spanID, thirdSpanID)) {
+ thirdFound = true;
+ }
+ }
+ assertTrue(firstFound);
+ assertTrue(secondFound);
+ assertTrue(thirdFound);
+
+ server1.close();
+ server2.close();
+ server3.close();
+ aPlugin.httpServer.close();
+ aPlugin.clientFacingServer.stop();
+ bPlugin.httpServer.close();
+ bPlugin.clientFacingServer.stop();
+ cPlugin.httpServer.close();
+ cPlugin.clientFacingServer.stop();
+ dPlugin.httpServer.close();
+ dPlugin.clientFacingServer.stop();
+
+ }
+
+ /** Sleeps as requested. */
+ private static class SleepyResponder extends GenericResponder {
+ public SleepyResponder(Protocol local) {
+ super(local);
+ }
+
+ @Override
+ public Object respond(Message message, Object request)
+ throws AvroRemoteException {
+ try {
+ Thread.sleep((Long)((GenericRecord)request).get("millis"));
+ } catch (InterruptedException e) {
+ throw new AvroRemoteException(e);
+ }
+ return null;
+ }
+ }
+
+ /**
+ * Demo program for using RPC trace. This automatically generates
+ * client RPC requests.
+ * @param args
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ if (args.length == 0) {
+ args = new String[] { "7002", "7003" };
+ }
+ Protocol protocol = Protocol.parse("{\"protocol\": \"sleepy\", "
+ + "\"messages\": { \"sleep\": {"
+ + " \"request\": [{\"name\": \"millis\", \"type\": \"long\"}," +
+ "{\"name\": \"data\", \"type\": \"bytes\"}], "
+ + " \"response\": \"null\"} } }");
+ Log.info("Using protocol: " + protocol.toString());
+ Responder r = new SleepyResponder(protocol);
+ TracePlugin p = new TracePlugin(new TracePluginConfiguration());
+ r.addRPCPlugin(p);
+
+ // Start Avro server
+ new HttpServer(r, Integer.parseInt(args[0]));
+
+ HttpTransceiver trans = new HttpTransceiver(
+ new URL("http://localhost:" + Integer.parseInt(args[0])));
+ GenericRequestor req = new GenericRequestor(protocol, trans);
+ TracePluginConfiguration clientConf = new TracePluginConfiguration();
+ clientConf.clientPort = 12346;
+ clientConf.port = 12336;
+ clientConf.traceProb = 1.0;
+ req.addRPCPlugin(new TracePlugin(clientConf));
+
+ while(true) {
+ Thread.sleep(1000);
+ GenericRecord params = new GenericData.Record(protocol.getMessages().get(
+ "sleep").getRequest());
+ Random rand = new Random();
+ params.put("millis", Math.abs(rand.nextLong()) % 1000);
+ int payloadSize = Math.abs(rand.nextInt()) % 10000;
+ byte[] payload = new byte[payloadSize];
+ rand.nextBytes(payload);
+ params.put("data", ByteBuffer.wrap(payload));
+ req.request("sleep", params);
+ }
+ }
+}
Added: avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java Thu Aug 5 00:07:24 2010
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.trace;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.Protocol.Message;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRequestor;
+import org.apache.avro.generic.GenericResponder;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.HttpTransceiver;
+import org.apache.avro.ipc.RPCPlugin;
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.trace.SpanAggregator.SpanAggregationResults;
+import org.apache.avro.ipc.trace.SpanAggregator.TraceFormationResults;
+import org.junit.Test;
+
+/**
+ * Tests which test logging, aggregation, and analysis of traces.
+ */
+public class TestEndToEndTracing {
+
+
+ /*
+ * Messages are w, x, y which request/return
+ * incrementing int values (shown below).
+ *
+ * |-w-(1)-> | |
+ * | |-x-(2)-> | C
+ * | | <-x-(3)-|
+ * | |
+ * A | B |
+ * | | |
+ * | |-x-(4)-> |
+ * | | <-x-(5)-| D
+ * | | |
+ * |<-w-(6)- | |
+ *
+ * Listening ports are B: 21005
+ * C: 21006
+ * D: 21007
+ */
+
+ Protocol advancedProtocol = Protocol.parse("{\"protocol\": \"Advanced\", "
+ + "\"messages\": { "
+ + "\"w\": { \"request\": [{\"name\": \"req\", \"type\": \"int\"}], "
+ + " \"response\": \"int\"},"
+ + "\"x\": { \"request\": [{\"name\": \"req\", \"type\": \"int\"}], "
+ + " \"response\": \"int\"},"
+ + "\"y\": { \"request\": [{\"name\": \"req\", \"type\": \"int\"}], "
+ + " \"response\": \"int\"}"
+ + " } }");
+
+ /** Middle Responder */
+ static class RecursingResponder extends GenericResponder {
+ HttpTransceiver transC;
+ HttpTransceiver transD;
+ GenericRequestor reqC;
+ GenericRequestor reqD;
+ Protocol protocol;
+
+ public RecursingResponder(Protocol local, RPCPlugin plugin)
+ throws Exception {
+ super(local);
+ transC = new HttpTransceiver(
+ new URL("http://localhost:21006"));
+ transD = new HttpTransceiver(
+ new URL("http://localhost:21007"));
+ reqC = new GenericRequestor(local, transC);
+ reqC.addRPCPlugin(plugin);
+ reqD = new GenericRequestor(local, transD);
+ reqD.addRPCPlugin(plugin);
+
+ protocol = local;
+ }
+
+ @Override
+ public Object respond(Message message, Object request)
+ throws IOException {
+ assertTrue("w".equals(message.getName()));
+ GenericRecord inParams = (GenericRecord)request;
+ Integer currentCount = (Integer) inParams.get("req");
+ assertTrue(currentCount.equals(1));
+
+ GenericRecord paramsC = new GenericData.Record(
+ protocol.getMessages().get("x").getRequest());
+ paramsC.put("req", currentCount + 1);
+ Integer returnC = (Integer) reqC.request("x", paramsC);
+ assertTrue(returnC.equals(currentCount + 2));
+
+ GenericRecord paramsD = new GenericData.Record(
+ protocol.getMessages().get("x").getRequest());
+ paramsD.put("req", currentCount + 3);
+ Integer returnD = (Integer) reqD.request("x", paramsD);
+ assertTrue(returnD.equals(currentCount + 4));
+
+ return currentCount + 5;
+ }
+ }
+
+ /** Endpoint responder */
+ static class EndpointResponder extends GenericResponder {
+ public EndpointResponder(Protocol local) {
+ super(local);
+ }
+
+ @Override
+ public Object respond(Message message, Object request)
+ throws AvroRemoteException {
+ GenericRecord inParams = (GenericRecord)request;
+ Integer currentCount = (Integer) inParams.get("req");
+
+ return currentCount + 1;
+ }
+ }
+
+ @Test
+ public void testTraceAndCollection() throws Exception {
+ TracePluginConfiguration conf = new TracePluginConfiguration();
+ conf.traceProb = 1.0;
+ conf.port = 51010;
+ conf.clientPort = 12346;
+ TracePlugin aPlugin = new TracePlugin(conf);
+ conf.port = 51011;
+ conf.clientPort = 12347;
+ TracePlugin bPlugin = new TracePlugin(conf);
+ conf.port = 51012;
+ conf.clientPort = 12348;
+ TracePlugin cPlugin = new TracePlugin(conf);
+ conf.port = 51013;
+ conf.clientPort = 12349;
+ TracePlugin dPlugin = new TracePlugin(conf);
+
+ // Responders
+ Responder bRes = new RecursingResponder(advancedProtocol, bPlugin);
+ bRes.addRPCPlugin(bPlugin);
+ HttpServer server1 = new HttpServer(bRes, 21005);
+ server1.start();
+
+ Responder cRes = new EndpointResponder(advancedProtocol);
+ cRes.addRPCPlugin(cPlugin);
+ HttpServer server2 = new HttpServer(cRes, 21006);
+ server2.start();
+
+ Responder dRes = new EndpointResponder(advancedProtocol);
+ dRes.addRPCPlugin(dPlugin);
+ HttpServer server3 = new HttpServer(dRes, 21007);
+ server3.start();
+
+ // Root requestor
+ HttpTransceiver trans = new HttpTransceiver(
+ new URL("http://localhost:21005"));
+
+ GenericRequestor r = new GenericRequestor(advancedProtocol, trans);
+ r.addRPCPlugin(aPlugin);
+
+ GenericRecord params = new GenericData.Record(
+ advancedProtocol.getMessages().get("w").getRequest());
+ params.put("req", 1);
+ r.request("w", params);
+
+ ArrayList<Span> allSpans = new ArrayList<Span>();
+
+ allSpans.addAll(aPlugin.storage.getAllSpans());
+ allSpans.addAll(bPlugin.storage.getAllSpans());
+ allSpans.addAll(cPlugin.storage.getAllSpans());
+ allSpans.addAll(dPlugin.storage.getAllSpans());
+
+ SpanAggregationResults results = SpanAggregator.getFullSpans(allSpans);
+
+ TraceFormationResults traces = SpanAggregator.getTraces(results.completeSpans);
+
+ assertEquals(1, traces.traces.size());
+ assertEquals(0, traces.rejectedSpans.size());
+
+ // Test debug printing of traces
+ String string1 = traces.traces.get(0).printWithTiming();
+ assertTrue(string1.contains("w"));
+ assertTrue(string1.contains("x"));
+ assertTrue(string1.indexOf("x") != string1.lastIndexOf("x")); // assure two x's
+
+ String string2 = traces.traces.get(0).printBrief();
+ assertTrue(string2.contains("w"));
+ assertTrue(string2.contains("x"));
+ assertTrue(string2.indexOf("x") != string2.lastIndexOf("x")); // assure two x's
+
+ // Just for fun, print to console
+ System.out.println(traces.traces.get(0).printWithTiming());
+ System.out.println(traces.traces.get(0).printBrief());
+
+ server1.close();
+ server2.close();
+ server3.close();
+ aPlugin.httpServer.close();
+ aPlugin.clientFacingServer.stop();
+ bPlugin.httpServer.close();
+ bPlugin.clientFacingServer.stop();
+ cPlugin.httpServer.close();
+ cPlugin.clientFacingServer.stop();
+ dPlugin.httpServer.close();
+ dPlugin.clientFacingServer.stop();
+ }
+
+ /** Sleeps as requested. */
+ private static class SleepyResponder extends GenericResponder {
+ public SleepyResponder(Protocol local) {
+ super(local);
+ }
+
+ @Override
+ public Object respond(Message message, Object request)
+ throws AvroRemoteException {
+ try {
+ Thread.sleep((Long)((GenericRecord)request).get("millis"));
+ } catch (InterruptedException e) {
+ throw new AvroRemoteException(e);
+ }
+ return null;
+ }
+ }
+}
Added: avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestSpanAggregation.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestSpanAggregation.java?rev=982434&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestSpanAggregation.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestSpanAggregation.java Thu Aug 5 00:07:24 2010
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.trace;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.apache.avro.ipc.trace.Util.idValue;
+import static org.apache.avro.ipc.trace.Util.idsEqual;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.ipc.trace.SpanAggregator.SpanAggregationResults;
+import org.apache.avro.util.Utf8;
+import org.junit.Test;
+
+import edu.emory.mathcs.backport.java.util.Arrays;
+
+/**
+ * Tests for span aggregation utility methods.
+ */
+public class TestSpanAggregation {
+ /**
+ * Test merging of two basic spans.
+ */
+ @Test
+ public void testSpanCompletion1() {
+ Span span1a = createClientSpan(idValue(1), idValue(1), null, new Utf8("a"));
+ Span span1b = createServerSpan(idValue(1), idValue(1), null, new Utf8("a"));
+
+ List<Span> partials = new ArrayList<Span>();
+ partials.add(span1a);
+ partials.add(span1b);
+ SpanAggregationResults results = SpanAggregator.getFullSpans(partials);
+
+ assertNotNull(results.completeSpans);
+ assertNotNull(results.incompleteSpans);
+ assertTrue(results.incompleteSpans.size() == 0);
+ assertTrue(results.completeSpans.size() == 1);
+
+ Span result = results.completeSpans.get(0);
+ assertEquals(null, result.parentSpanID);
+ assertTrue(idsEqual(idValue(1), result.spanID));
+ assertEquals(4, result.events.size());
+ }
+
+ /**
+ * Test span merging with some extra invalid spans;
+ */
+ @Test
+ public void testInvalidSpanCompletion() {
+ // Trace: 1, Span: 1, Parent: null
+ Span span1a = createClientSpan(idValue(1), idValue(1), null, new Utf8("a"));
+ Span span1b = createServerSpan(idValue(1), idValue(1), null, new Utf8("a"));
+
+ // Trace: 1, Span: 10, Parent: 3
+ Span spanBogus1 = createClientSpan(idValue(1), idValue(10), idValue(3), new Utf8("not"));
+ Span spanBogus2 = createServerSpan(idValue(1), idValue(10), idValue(3), new Utf8("equal"));
+
+ // Trace: 1, Span: 5, Parent: (2/3)
+ Span spanBogus3 = createClientSpan(idValue(1), idValue(5), idValue(2), new Utf8("equal"));
+ Span spanBogus4 = createServerSpan(idValue(1), idValue(5), idValue(3), new Utf8("equal"));
+
+ // Trace:1, Span: 4, Parent: 1
+ Span spanBogus5 = createClientSpan(idValue(1), idValue(4), idValue(1), new Utf8("alone"));
+
+ List<Span> partials = new ArrayList<Span>();
+ partials.add(span1a);
+ partials.add(span1b);
+ partials.add(spanBogus1);
+ partials.add(spanBogus2);
+ partials.add(spanBogus3);
+ partials.add(spanBogus4);
+ partials.add(spanBogus5);
+
+ SpanAggregationResults results = SpanAggregator.getFullSpans(partials);
+ assertNotNull(results.completeSpans);
+ assertNotNull(results.incompleteSpans);
+
+ assertTrue(results.incompleteSpans.size() == 5);
+ assertTrue(results.incompleteSpans.contains(spanBogus1));
+ assertTrue(results.incompleteSpans.contains(spanBogus2));
+ assertTrue(results.incompleteSpans.contains(spanBogus3));
+ assertTrue(results.incompleteSpans.contains(spanBogus4));
+ assertTrue(results.incompleteSpans.contains(spanBogus5));
+
+ assertTrue(results.completeSpans.size() == 1);
+ Span result = results.completeSpans.get(0);
+ assertTrue(result.complete);
+ assertTrue(idsEqual(idValue(1), result.spanID));
+ assertEquals(new Utf8("requestorHostname"), result.requestorHostname);
+ assertEquals(new Utf8("responderHostname"), result.responderHostname);
+ assertNull(result.parentSpanID);
+ assertEquals(new Utf8("a"), result.messageName);
+ }
+
+ /**
+ * Test basic formation of a trace.
+ * a
+ * b
+ * c d
+ * e
+ */
+ @Test
+ public void testTraceFormation1() {
+ Span a1 = createClientSpan(idValue(1), idValue(1), null, new Utf8("a"));
+ Span a2 = createServerSpan(idValue(1), idValue(1), null, new Utf8("a"));
+
+ Span b1 = createClientSpan(idValue(1), idValue(2), idValue(1), new Utf8("b"));
+ Span b2 = createServerSpan(idValue(1), idValue(2), idValue(1), new Utf8("b"));
+
+ Span c1 = createClientSpan(idValue(1), idValue(3), idValue(2), new Utf8("c"));
+ Span c2 = createServerSpan(idValue(1), idValue(3), idValue(2), new Utf8("c"));
+
+ Span d1 = createClientSpan(idValue(1), idValue(4), idValue(2), new Utf8("d"));
+ Span d2 = createServerSpan(idValue(1), idValue(4), idValue(2), new Utf8("d"));
+
+ Span e1 = createClientSpan(idValue(1), idValue(5), idValue(4), new Utf8("e"));
+ Span e2 = createServerSpan(idValue(1), idValue(5), idValue(4), new Utf8("e"));
+
+ List<Span> spans = new LinkedList<Span>();
+ spans.addAll(Arrays.asList(new Span[] {a1, a2, b1, b2, c1, c2, d1, d2, e1, e2}));
+
+ List<Span> merged = SpanAggregator.getFullSpans(spans).completeSpans;
+
+ assertEquals(5, merged.size());
+ for (Span s: merged) {
+ assertEquals(new Utf8("requestorHostname"), s.requestorHostname);
+ assertEquals(new Utf8("responderHostname"), s.responderHostname);
+ }
+
+ List<Trace> traces = SpanAggregator.getTraces(merged).traces;
+ assertEquals(1, traces.size());
+
+ assertEquals("Trace: (a (b (c) (d (e))))", traces.get(0).printBrief());
+
+ }
+
+ /**
+ * Make a mock Span including client-side timing data.
+ */
+ public Span createClientSpan(ID traceID, ID spanID, ID parentID, Utf8 msgName) {
+ Span out = new Span();
+ out.spanID = spanID;
+ out.traceID = traceID;
+ out.requestorHostname = new Utf8("requestorHostname");
+
+ if (parentID != null) {
+ out.parentSpanID = parentID;
+ }
+ out.messageName = msgName;
+ out.complete = false;
+
+ TimestampedEvent event1 = new TimestampedEvent();
+ event1.event = SpanEvent.CLIENT_SEND;
+ event1.timeStamp = System.currentTimeMillis() * 1000000;
+
+ TimestampedEvent event2 = new TimestampedEvent();
+ event2.event = SpanEvent.CLIENT_RECV;
+ event2.timeStamp = System.currentTimeMillis() * 1000000;
+
+ out.events = new GenericData.Array(
+ 2, Schema.createArray(TimestampedEvent.SCHEMA$));
+ out.events.add(event1);
+ out.events.add(event2);
+
+ return out;
+ }
+
+ /**
+ * Make a mock Span including server-side timing data.
+ */
+ public Span createServerSpan(ID traceID, ID spanID, ID parentID, Utf8 msgName) {
+ Span out = new Span();
+ out.spanID = spanID;
+ out.traceID = traceID;
+ out.responderHostname = new Utf8("responderHostname");
+
+ if (parentID != null) {
+ out.parentSpanID = parentID;
+ }
+ out.messageName = msgName;
+ out.complete = false;
+
+ TimestampedEvent event1 = new TimestampedEvent();
+ event1.event = SpanEvent.SERVER_RECV;
+ event1.timeStamp = System.currentTimeMillis();
+
+ TimestampedEvent event2 = new TimestampedEvent();
+ event2.event = SpanEvent.SERVER_SEND;
+ event2.timeStamp = System.currentTimeMillis();
+
+ out.events = new GenericData.Array(
+ 2, Schema.createArray(TimestampedEvent.SCHEMA$));
+ out.events.add(event1);
+ out.events.add(event2);
+
+ return out;
+ }
+}