You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/01/08 19:53:19 UTC

svn commit: r897299 - in /hadoop/avro/trunk: ./ src/java/org/apache/avro/ipc/stats/ src/test/java/org/apache/avro/ipc/stats/

Author: cutting
Date: Fri Jan  8 18:53:18 2010
New Revision: 897299

URL: http://svn.apache.org/viewvc?rev=897299&view=rev
Log:
AVRO-273, AVRO-275, & AVRO-279. Add Java RPC statistics collection and display.  Contributed by Philip Zeyliger.

Added:
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/FloatHistogram.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/Histogram.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/StatsPlugin.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/StatsServlet.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/Stopwatch.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/package.html
    hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/
    hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/FakeTicks.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/StatsPluginOverhead.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestHistogram.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestStatsPluginAndServlet.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestStopwatch.java
Modified:
    hadoop/avro/trunk/CHANGES.txt

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=897299&r1=897298&r2=897299&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Fri Jan  8 18:53:18 2010
@@ -50,6 +50,9 @@
 
     AVRO-271. Add a Java local RPC transceiver. (Philip Zeyliger via cutting)
 
+    AVRO-273, AVRO-275, & AVRO-279. Add Java RPC statistics collection
+    and display. (Philip Zeyliger via cutting)
+
   IMPROVEMENTS
 
     AVRO-157. Changes from code review comments for C++. (sbanacho)

Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/FloatHistogram.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/FloatHistogram.java?rev=897299&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/FloatHistogram.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/FloatHistogram.java Fri Jan  8 18:53:18 2010
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.stats;
+
+/**
+ * Specific implementation of histogram for floats,
+ * which also keeps track of basic summary statistics.
+ * @param <B>
+ */
+class FloatHistogram<B> extends Histogram<B, Float> {
+  private float runningSum;
+  private float runningSumOfSquares;
+
+  public FloatHistogram(Segmenter<B, Float> segmenter) {
+    super(segmenter);
+  }
+
+  @Override
+  public void add(Float value) {
+    super.add(value);
+    runningSum += value;
+    runningSumOfSquares += value*value;
+  }
+
+  public float getMean() {
+    if (totalCount == 0) {
+      return Float.NaN;
+    }
+    return runningSum / totalCount;
+  }
+
+  public float getUnbiasedStdDev() {
+    if (totalCount <= 1) {
+      return Float.NaN;
+    }
+    float mean = getMean();
+    return (float)Math.sqrt((runningSumOfSquares - totalCount*mean*mean)/(totalCount - 1));
+  }
+}

Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/Histogram.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/Histogram.java?rev=897299&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/Histogram.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/Histogram.java Fri Jan  8 18:53:18 2010
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.stats;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeMap;
+
+/**
+ * Represents a histogram of values.  This class uses a {@link Segmenter}
+ * to determine which bucket to place a given value into.
+ *
+ * Note that Histogram, by itself, is not synchronized.
+ * @param <B> Bucket type.  Often String, since buckets are typically
+ * used for their toString() representation.
+ * @param <T> Type of value
+ */
+class Histogram<B, T> {
+  private Segmenter<B, T> segmenter;
+  private int[] counts;
+  protected int totalCount;
+
+  /**
+   * Interface to determine which bucket to place a value in.
+   *
+   * Segmenters should be immutable, so many histograms can re-use
+   * the same segmenter.
+   */
+  interface Segmenter<B, T> {
+    /** Number of buckets to use. */
+    int size();
+    /**
+     * Which bucket to place value in.
+     *
+     * @return Index of bucket for the value.  At least 0 and less than size().
+     * @throws SegmenterException if value does not fit in a bucket.
+     */
+    int segment(T value);
+    /**
+     * Returns an iterator of buckets. The order of iteration
+     * is consistent with the segment numbers.
+     */
+    Iterator<B> getBuckets();
+  }
+
+  public static class SegmenterException extends RuntimeException {
+    public SegmenterException(String s) {
+      super(s);
+    }
+  }
+
+  public static class TreeMapSegmenter<T extends Comparable<T>>
+      implements Segmenter<String, T> {
+    private TreeMap<T, Integer> index = new TreeMap<T, Integer>();
+    public TreeMapSegmenter(SortedSet<T> leftEndpoints) {
+      if (leftEndpoints.isEmpty()) {
+        throw new IllegalArgumentException(
+            "Endpoints must not be empty: " + leftEndpoints);
+      }
+      int i = 0;
+      for (T t : leftEndpoints) {
+        index.put(t, i++);
+      }
+    }
+
+    public int segment(T value) {
+      Map.Entry<T, Integer> e = index.floorEntry(value);
+      if (e == null) {
+        throw new SegmenterException("Could not find bucket for: " + value);
+      }
+      return e.getValue();
+    }
+
+    @Override
+    public int size() {
+      return index.size();
+    }
+
+    private String rangeAsString(T a, T b) {
+      return String.format("[%s,%s)", a, b == null ? "infinity" : b);
+    }
+
+    @Override
+    public Iterator<String> getBuckets() {
+      return new Iterator<String>() {
+        Iterator<T> it = index.keySet().iterator();
+        T cur = it.next(); // there's always at least one element
+
+        @Override
+        public boolean hasNext() {
+          return it.hasNext();
+        }
+
+        @Override
+        public String next() {
+          T left = cur;
+          cur = it.hasNext() ? it.next() : null;
+          return rangeAsString(left, cur);
+        }
+
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException();
+
+        }
+      };
+    }
+  }
+
+  /**
+   * Creates a histogram using the specified segmenter.
+   */
+  public Histogram(Segmenter<B, T> segmenter) {
+    this.segmenter = segmenter;
+    this.counts = new int[segmenter.size()];
+  }
+
+  /** Tallies a value in the histogram. */
+  public void add(T value) {
+    int i = segmenter.segment(value);
+    counts[i]++;
+    totalCount++;
+  }
+
+  /**
+   * Returns the underlying bucket values.
+   */
+  public int[] getHistogram() {
+    return counts;
+  }
+
+  /** Returns the total count of entries. */
+  public int getCount() {
+    return totalCount;
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    boolean first = true;
+    for (Entry<B> e : entries()) {
+      if (!first) {
+        sb.append(";");
+      } else {
+        first = false;
+      }
+      sb.append(e.bucket).append("=").append(e.count);
+    }
+    return sb.toString();
+  }
+
+  static class Entry<B> {
+    public Entry(B bucket, int count) {
+      this.bucket = bucket;
+      this.count = count;
+    }
+    B bucket;
+    int count;
+  }
+
+  private class EntryIterator implements Iterable<Entry<B>>, Iterator<Entry<B>> {
+    int i = 0;
+    Iterator<B> bucketNameIterator = segmenter.getBuckets();
+
+    @Override
+    public Iterator<Entry<B>> iterator() {
+      return this;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return i < segmenter.size();
+    }
+
+    @Override
+    public Entry<B> next() {
+      return new Entry<B>(bucketNameIterator.next(), counts[i++]);
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+
+  }
+
+  public Iterable<Entry<B>> entries() {
+    return new EntryIterator();
+  }
+}

Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/StatsPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/StatsPlugin.java?rev=897299&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/StatsPlugin.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/StatsPlugin.java Fri Jan  8 18:53:18 2010
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.stats;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.avro.Protocol.Message;
+import org.apache.avro.ipc.RPCContext;
+import org.apache.avro.ipc.RPCPlugin;
+import org.apache.avro.ipc.stats.Histogram.Segmenter;
+import org.apache.avro.ipc.stats.Stopwatch.Ticks;
+
+/**
+ * Collects count and latency statistics about RPC calls.  Keeps
+ * data for every method.
+ *
+ * This uses milliseconds as the standard unit of measure
+ * throughout the class, stored in floats.
+ */
+public class StatsPlugin extends RPCPlugin {
+  /** Static declaration of histogram buckets. */
+  static final Segmenter<String, Float> DEFAULT_SEGMENTER =
+    new Histogram.TreeMapSegmenter<Float>(new TreeSet<Float>(Arrays.asList(
+            0f,
+           25f,
+           50f,
+           75f,
+          100f,
+          200f,
+          300f,
+          500f,
+          750f,
+         1000f, // 1 second
+         2000f,
+         5000f,
+        10000f,
+        60000f, // 1 minute
+       600000f)));
+
+  /** Per-method histograms.
+   * Must be accessed while holding a lock on methodTimings. */
+  Map<Message, FloatHistogram<?>> methodTimings =
+    new HashMap<Message, FloatHistogram<?>>();
+
+  /** RPCs in flight. */
+  ConcurrentMap<RPCContext, Stopwatch> activeRpcs =
+    new ConcurrentHashMap<RPCContext, Stopwatch>();
+  private Ticks ticks;
+
+  private Segmenter<?, Float> segmenter;
+
+  /** Construct a plugin with custom Ticks and Segmenter implementations. */
+  StatsPlugin(Ticks ticks, Segmenter<?, Float> segmenter) {
+    this.segmenter = segmenter;
+    this.ticks = ticks;
+  }
+
+  /** Construct a plugin with default (system) ticks, and default
+   * histogram segmentation. */
+  public StatsPlugin() {
+    this(Stopwatch.SYSTEM_TICKS, DEFAULT_SEGMENTER);
+  }
+
+  @Override
+  public void serverReceiveRequest(RPCContext context) {
+    Stopwatch t = new Stopwatch(ticks);
+    t.start();
+    this.activeRpcs.put(context, t);
+  }
+
+  @Override
+  public void serverSendResponse(RPCContext context) {
+    Stopwatch t = this.activeRpcs.remove(context);
+    t.stop();
+    publish(context, t);
+  }
+
+  /** Adds timing to the histograms. */
+  private void publish(RPCContext context, Stopwatch t) {
+    Message message = context.getMessage();
+    if (message == null) throw new IllegalArgumentException();
+    synchronized(methodTimings) {
+      FloatHistogram<?> h = methodTimings.get(context.getMessage());
+      if (h == null) {
+        h = createNewHistogram();
+        methodTimings.put(context.getMessage(), h);
+      }
+      h.add(nanosToMillis(t.elapsedNanos()));
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private FloatHistogram<?> createNewHistogram() {
+    return new FloatHistogram(segmenter);
+  }
+
+  /** Converts nanoseconds to milliseconds. */
+  static float nanosToMillis(long elapsedNanos) {
+    return elapsedNanos / 1000000.0f;
+  }
+}

Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/StatsServlet.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/StatsServlet.java?rev=897299&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/StatsServlet.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/StatsServlet.java Fri Jan  8 18:53:18 2010
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.stats;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Map.Entry;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.avro.Protocol.Message;
+import org.apache.avro.ipc.RPCContext;
+
+/**
+ * Exposes information provided by a StatsPlugin as
+ * a web page.
+ *
+ * This class follows the same synchronization conventions
+ * as StatsPlugin, to avoid requiring StatsPlugin to serve
+ * a copy of the data.
+ */
+public class StatsServlet extends HttpServlet {
+  private final StatsPlugin statsPlugin;
+
+  public StatsServlet(StatsPlugin statsPlugin) {
+    this.statsPlugin = statsPlugin;
+  }
+
+  @Override
+  protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+      throws ServletException, IOException {
+    resp.setContentType("text/html");
+    writeStats(resp.getWriter());
+  }
+
+  void writeStats(Writer w) throws IOException {
+    w.append("<html><head><title>Avro RPC Stats</title></head>");
+    w.append("<body><h1>Avro RPC Stats</h1>");
+
+    w.append("<h2>Active RPCs</h2>");
+    w.append("<ol>");
+    for (Entry<RPCContext, Stopwatch> rpc : this.statsPlugin.activeRpcs.entrySet()) {
+      writeActiveRpc(w, rpc.getKey(), rpc.getValue());
+    }
+    w.append("</ol>");
+
+    w.append("<h2>Per-method Timing</h2>");
+    synchronized(this.statsPlugin.methodTimings) {
+      for (Entry<Message, FloatHistogram<?>> e :
+        this.statsPlugin.methodTimings.entrySet()) {
+        writeMethod(w, e.getKey(), e.getValue());
+      }
+    }
+    w.append("</body></html>");
+  }
+
+  private void writeActiveRpc(Writer w, RPCContext rpc, Stopwatch stopwatch) throws IOException {
+    w.append("<li>").append(rpc.getMessage().getName()).append(": ");
+    w.append(formatMillis(StatsPlugin.nanosToMillis(stopwatch.elapsedNanos())));
+    w.append("</li>");
+  }
+
+  private void writeMethod(Writer w, Message message, FloatHistogram<?> hist) throws IOException {
+    w.append("<h3>").append(message.getName()).append("</h3>");
+    w.append("<p>Number of calls: ");
+    w.append(Integer.toString(hist.getCount()));
+    w.append("</p><p>Average Duration: ");
+    w.append(formatMillis(hist.getMean()));
+    w.append("</p>");
+    w.append("</p><p>Std Dev: ");
+    w.append(formatMillis(hist.getUnbiasedStdDev()));
+    w.append("</p>");
+
+    w.append("<dl>");
+
+    for (Histogram.Entry<?> e : hist.entries()) {
+      w.append("<dt>");
+      w.append(e.bucket.toString());
+      w.append("</dt>");
+      w.append("<dd>").append(Integer.toString(e.count)).append("</dd>");
+      w.append("</dt>");
+    }
+    w.append("</dl>");
+  }
+
+  private CharSequence formatMillis(float millis) {
+    return String.format("%.0fms", millis);
+  }
+}

Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/Stopwatch.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/Stopwatch.java?rev=897299&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/Stopwatch.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/Stopwatch.java Fri Jan  8 18:53:18 2010
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.stats;
+
+/** Encapsulates the passing of time. */
+class Stopwatch  {
+  /** Encapsulates ticking time sources. */
+  interface Ticks {
+    /**
+     * Returns a number of "ticks" in nanoseconds.
+     * This should be monotonically non-decreasing.
+     */
+    long ticks();
+  }
+
+  /** Default System time source. */
+  public final static Ticks SYSTEM_TICKS = new SystemTicks();
+
+  private Ticks ticks;
+  private long start;
+  private long elapsed = -1;
+  private boolean running;
+
+  public Stopwatch(Ticks ticks) {
+    this.ticks = ticks;
+  }
+
+  /** Returns seconds that have elapsed since start() */
+  public long elapsedNanos() {
+    if (running) {
+      return this.ticks.ticks() - start;
+    } else {
+      if (elapsed == -1) throw new IllegalStateException();
+      return elapsed;
+    }
+  }
+
+  /** Starts the stopwatch. */
+  public void start() {
+    if (running) throw new IllegalStateException();
+    start = ticks.ticks();
+    running = true;
+  }
+
+  /** Stops the stopwatch and calculates the elapsed time. */
+  public void stop() {
+    if (!running) throw new IllegalStateException();
+    elapsed = ticks.ticks() - start;
+    running = false;
+  }
+
+  /** Implementation of Ticks using System.nanoTime(). */
+  private static class SystemTicks implements Ticks {
+    @Override
+    public long ticks() {
+      return System.nanoTime();
+    }
+  }
+}

Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/package.html
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/package.html?rev=897299&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/package.html (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/package.html Fri Jan  8 18:53:18 2010
@@ -0,0 +1,23 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+Utilities to collect and display IPC statistics.
+</body>
+</html>

Added: hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/FakeTicks.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/FakeTicks.java?rev=897299&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/FakeTicks.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/FakeTicks.java Fri Jan  8 18:53:18 2010
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.stats;
+
+import org.apache.avro.ipc.stats.Stopwatch.Ticks;
+
+/** Implements Ticks with manual time-winding. */
+class FakeTicks implements Ticks {
+  long time = 0;
+
+  @Override
+  public long ticks() {
+    return time;
+  }
+
+  public void passTime(long nanos) {
+    time += nanos;
+  }
+
+}

Added: hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/StatsPluginOverhead.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/StatsPluginOverhead.java?rev=897299&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/StatsPluginOverhead.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/StatsPluginOverhead.java Fri Jan  8 18:53:18 2010
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.stats;
+
+import java.io.IOException;
+import java.net.URL;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.Protocol.Message;
+import org.apache.avro.generic.GenericRequestor;
+import org.apache.avro.generic.GenericResponder;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.HttpTransceiver;
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.Transceiver;
+
+/**
+ * Naively measures overhead of using the stats plugin.
+ *
+ * The API used is the generic one.
+ * The protocol is the "null" protocol: null is sent
+ * and returned.
+ */
+public class StatsPluginOverhead {
+  /** Number of RPCs per iteration. */
+  private static final int COUNT = 100000;
+  private static final Protocol NULL_PROTOCOL = Protocol.parse(
+      "{\"protocol\": \"null\", "
+      + "\"messages\": { \"null\": {"
+      + "   \"request\": [], "
+      + "   \"response\": \"null\"} } }");
+
+  private static class IdentityResponder extends GenericResponder {
+    public IdentityResponder(Protocol local) {
+      super(local);
+    }
+
+    @Override
+    public Object respond(Message message, Object request)
+        throws AvroRemoteException {
+      return request;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    double with = sendRpcs(true)/1000000000.0;
+    double without = sendRpcs(false)/1000000000.0;
+
+    System.out.println(String.format(
+        "Overhead: %f%%.  RPC/s: %f (with) vs %f (without).  " +
+        "RPC time (ms): %f vs %f",
+        100*(with - without)/(without),
+        COUNT/with,
+        COUNT/without,
+        1000*with/COUNT,
+        1000*without/COUNT));
+  }
+
+  /** Sends RPCs and returns nanos elapsed. */
+  private static long sendRpcs(boolean withPlugin) throws IOException {
+    HttpServer server = createServer(withPlugin);
+    Transceiver t =
+      new HttpTransceiver(new URL("http://127.0.0.1:"+server.getPort()+"/"));
+    GenericRequestor requestor = new GenericRequestor(NULL_PROTOCOL, t);
+
+    long now = System.nanoTime();
+    for (int i = 0; i < COUNT; ++i) {
+      requestor.request("null", null);
+    }
+    long elapsed = System.nanoTime() - now;
+    t.close();
+    server.close();
+    return elapsed;
+  }
+
+  /** Starts an Avro server. */
+  private static HttpServer createServer(boolean withPlugin)
+      throws IOException {
+    Responder r = new IdentityResponder(NULL_PROTOCOL);
+    if (withPlugin) {
+      r.addRPCPlugin(new StatsPlugin());
+    }
+    // Start Avro server
+    return new HttpServer(r, 0);
+  }
+
+}

Added: hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestHistogram.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestHistogram.java?rev=897299&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestHistogram.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestHistogram.java Fri Jan  8 18:53:18 2010
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.stats;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.avro.ipc.stats.Histogram.Entry;
+import org.apache.avro.ipc.stats.Histogram.Segmenter;
+import org.junit.Test;
+
+public class TestHistogram {
+
+  @Test
+  public void testBasicOperation() {
+    Segmenter<String, Integer> s = new Histogram.TreeMapSegmenter<Integer>(
+        new TreeSet<Integer>(Arrays.asList(0, 1, 2, 4, 8, 16)));
+
+    Histogram<String, Integer> h = new Histogram<String, Integer>(s);
+
+    for(int i = 0; i < 20; ++i) {
+      h.add(i);
+    }
+    assertEquals(20, h.getCount());
+    assertArrayEquals(new int[] { 1, 1, 2, 4, 8, 4 }, h.getHistogram());
+
+    assertEquals("[0,1)=1;[1,2)=1;[2,4)=2;[4,8)=4;[8,16)=8;[16,infinity)=4", h.toString());
+
+    List<Entry<String>> entries = new ArrayList<Entry<String>>();
+    for (Entry<String> entry : h.entries()) {
+      entries.add(entry);
+    }
+    assertEquals("[0,1)", entries.get(0).bucket);
+    assertEquals(4, entries.get(5).count);
+    assertEquals(6, entries.size());
+  }
+
+  @Test(expected=Histogram.SegmenterException.class)
+  public void testBadValue() {
+    Segmenter<String, Long> s = new Histogram.TreeMapSegmenter<Long>(
+        new TreeSet<Long>(Arrays.asList(0L, 1L, 2L, 4L, 8L, 16L)));
+
+    Histogram<String, Long> h = new Histogram<String, Long>(s);
+    h.add(-1L);
+  }
+
+  /** Only has one bucket */
+  static class SingleBucketSegmenter implements Segmenter<String, Float >{
+    @Override
+    public Iterator<String> getBuckets() {
+      return Arrays.asList("X").iterator();
+    }
+
+    @Override
+    public int segment(Float value) { return 0; }
+
+    @Override
+    public int size() { return 1; }
+  }
+
+  @Test
+  public void testFloatHistogram() {
+    FloatHistogram<String> h = new FloatHistogram<String>(new SingleBucketSegmenter());
+    h.add(12.0f);
+    h.add(10.0f);
+    h.add(20.0f);
+
+    assertEquals(3, h.getCount());
+    assertEquals(14.0f, h.getMean(), 0.0001);
+    assertEquals(5.291f, h.getUnbiasedStdDev(), 0.001);
+  }
+
+}

Added: hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestStatsPluginAndServlet.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestStatsPluginAndServlet.java?rev=897299&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestStatsPluginAndServlet.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestStatsPluginAndServlet.java Fri Jan  8 18:53:18 2010
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.stats;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.Protocol.Message;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRequestor;
+import org.apache.avro.generic.GenericResponder;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.LocalTransceiver;
+import org.apache.avro.ipc.RPCContext;
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.Transceiver;
+import org.junit.Test;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.mortbay.log.Log;
+
+public class TestStatsPluginAndServlet {
+  Protocol protocol = Protocol.parse("" + "{\"protocol\": \"Minimal\", "
+      + "\"messages\": { \"m\": {"
+      + "   \"request\": [{\"name\": \"x\", \"type\": \"int\"}], "
+      + "   \"response\": \"int\"} } }");
+  Message message = protocol.getMessages().get("m");
+
+  private static final long MS = 1000*1000L;
+
+  /** Returns an HTML string. */
+  private String generateServletResponse(StatsPlugin statsPlugin)
+      throws IOException {
+    StatsServlet servlet = new StatsServlet(statsPlugin);
+    StringWriter w = new StringWriter();
+    servlet.writeStats(w);
+    String o = w.toString();
+    return o;
+  }
+
+  /** Expects 0 and returns 1. */
+  static class TestResponder extends GenericResponder {
+    public TestResponder(Protocol local) {
+      super(local);
+    }
+
+    @Override
+    public Object respond(Message message, Object request)
+        throws AvroRemoteException {
+      assertEquals(0, ((GenericRecord) request).get("x"));
+      return 1;
+    }
+
+  }
+
+  private void makeRequest(Transceiver t) throws IOException {
+    GenericRecord params = new GenericData.Record(protocol.getMessages().get(
+        "m").getRequest());
+    params.put("x", 0);
+    GenericRequestor r = new GenericRequestor(protocol, t);
+    assertEquals(1, r.request("m", params));
+  }
+
+  @Test
+  public void testFullServerPath() throws IOException {
+    Responder r = new TestResponder(protocol);
+    StatsPlugin statsPlugin = new StatsPlugin();
+    r.addRPCPlugin(statsPlugin);
+    Transceiver t = new LocalTransceiver(r);
+
+    for (int i = 0; i < 10; ++i) {
+      makeRequest(t);
+    }
+
+    String o = generateServletResponse(statsPlugin);
+    assertTrue(o.contains("Number of calls: 10"));
+  }
+
+  @Test
+  public void testMultipleRPCs() throws IOException {
+    FakeTicks t = new FakeTicks();
+    StatsPlugin statsPlugin = new StatsPlugin(t, StatsPlugin.DEFAULT_SEGMENTER);
+    RPCContext context1 = makeContext();
+    RPCContext context2 = makeContext();
+    statsPlugin.serverReceiveRequest(context1);
+    t.passTime(100*MS); // first takes 100ms
+    statsPlugin.serverReceiveRequest(context2);
+    String r = generateServletResponse(statsPlugin);
+    // Check in progress RPCs
+    assertTrue(r.contains("m: 0ms"));
+    assertTrue(r.contains("m: 100ms"));
+    statsPlugin.serverSendResponse(context1);
+    t.passTime(900*MS); // second takes 900ms
+    statsPlugin.serverSendResponse(context2);
+
+    r = generateServletResponse(statsPlugin);
+    assertTrue(r.contains("Average Duration: 500ms"));
+  }
+
+  private RPCContext makeContext() {
+    RPCContext context = new RPCContext();
+    context.setMessage(message);
+    return context;
+  }
+
+  /** Sleeps as requested. */
+  private static class SleepyResponder extends GenericResponder {
+    public SleepyResponder(Protocol local) {
+      super(local);
+    }
+
+    @Override
+    public Object respond(Message message, Object request)
+        throws AvroRemoteException {
+      try {
+        Thread.sleep((Long)((GenericRecord)request).get("millis"));
+      } catch (InterruptedException e) {
+        throw new AvroRemoteException(e);
+      }
+      return null;
+    }
+  }
+
+  /**
+   * Demo program for using RPC stats.  avroj can be used (as below)
+   * to trigger RPCs.
+   * <pre>
+   * java -jar build/avroj-1.2.0-dev.jar rpcsend '{"protocol":"sleepy","namespace":null,"types":[],"messages":{"sleep":{"request":[{"name":"millis","type":"long"}],"response":"null"}}}' sleep localhost 7002 '{"millis": 20000}'
+   * </pre>
+   * @param args
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    if (args.length == 0) {
+      args = new String[] { "7002", "7003" };
+    }
+    Protocol protocol = Protocol.parse("{\"protocol\": \"sleepy\", "
+        + "\"messages\": { \"sleep\": {"
+        + "   \"request\": [{\"name\": \"millis\", \"type\": \"long\"}], "
+        + "   \"response\": \"null\"} } }");
+    Log.info("Using protocol: " + protocol.toString());
+    Responder r = new SleepyResponder(protocol);
+    StatsPlugin p = new StatsPlugin();
+    r.addRPCPlugin(p);
+
+    // Start Avro server
+    new HttpServer(r, Integer.parseInt(args[0]));
+
+    // Ideally we could use the same Jetty server
+    Server httpServer = new Server(Integer.parseInt(args[1]));
+    new Context(httpServer, "/").addServlet(
+        new ServletHolder(new StatsServlet(p)), "/*");
+    httpServer.start();
+    while(true) {
+      Thread.sleep(60*1000);
+    }
+  }
+}

Added: hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestStopwatch.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestStopwatch.java?rev=897299&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestStopwatch.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestStopwatch.java Fri Jan  8 18:53:18 2010
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.stats;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class TestStopwatch {
+  @Test
+  public void testNormal() {
+    FakeTicks f = new FakeTicks();
+    Stopwatch s = new Stopwatch(f);
+    f.passTime(10);
+    s.start();
+    f.passTime(20);
+    assertEquals(20, s.elapsedNanos());
+    f.passTime(40);
+    s.stop();
+    f.passTime(80);
+    assertEquals(60, s.elapsedNanos());
+  }
+
+  @Test(expected=IllegalStateException.class)
+  public void testNotStarted1() {
+    FakeTicks f = new FakeTicks();
+    Stopwatch s = new Stopwatch(f);
+    s.elapsedNanos();
+  }
+
+  @Test(expected=IllegalStateException.class)
+  public void testNotStarted2() {
+    FakeTicks f = new FakeTicks();
+    Stopwatch s = new Stopwatch(f);
+    s.stop();
+  }
+
+  @Test(expected=IllegalStateException.class)
+  public void testTwiceStarted() {
+    FakeTicks f = new FakeTicks();
+    Stopwatch s = new Stopwatch(f);
+    s.start();
+    s.start();
+  }
+
+  @Test(expected=IllegalStateException.class)
+  public void testTwiceStopped() {
+    FakeTicks f = new FakeTicks();
+    Stopwatch s = new Stopwatch(f);
+    s.start();
+    s.stop();
+    s.stop();
+  }
+
+  @Test
+  public void testSystemStopwatch() {
+    Stopwatch s = new Stopwatch(Stopwatch.SYSTEM_TICKS);
+    s.start();
+    s.stop();
+    assertTrue(s.elapsedNanos() >= 0);
+  }
+
+}