You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/01/08 19:53:19 UTC
svn commit: r897299 - in /hadoop/avro/trunk: ./
src/java/org/apache/avro/ipc/stats/ src/test/java/org/apache/avro/ipc/stats/
Author: cutting
Date: Fri Jan 8 18:53:18 2010
New Revision: 897299
URL: http://svn.apache.org/viewvc?rev=897299&view=rev
Log:
AVRO-273, AVRO-275, & AVRO-279. Add Java RPC statistics collection and display. Contributed by Philip Zeyliger.
Added:
hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/
hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/FloatHistogram.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/Histogram.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/StatsPlugin.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/StatsServlet.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/Stopwatch.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/package.html
hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/
hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/FakeTicks.java
hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/StatsPluginOverhead.java
hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestHistogram.java
hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestStatsPluginAndServlet.java
hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestStopwatch.java
Modified:
hadoop/avro/trunk/CHANGES.txt
Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=897299&r1=897298&r2=897299&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Fri Jan 8 18:53:18 2010
@@ -50,6 +50,9 @@
AVRO-271. Add a Java local RPC transceiver. (Philip Zeyliger via cutting)
+ AVRO-273, AVRO-275, & AVRO-279. Add Java RPC statistics collection
+ and display. (Philip Zeyliger via cutting)
+
IMPROVEMENTS
AVRO-157. Changes from code review comments for C++. (sbanacho)
Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/FloatHistogram.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/FloatHistogram.java?rev=897299&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/FloatHistogram.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/FloatHistogram.java Fri Jan 8 18:53:18 2010
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.stats;
+
+/**
+ * Specific implementation of histogram for floats,
+ * which also keeps track of basic summary statistics.
+ * @param <B>
+ */
+class FloatHistogram<B> extends Histogram<B, Float> {
+ private float runningSum;
+ private float runningSumOfSquares;
+
+ public FloatHistogram(Segmenter<B, Float> segmenter) {
+ super(segmenter);
+ }
+
+ @Override
+ public void add(Float value) {
+ super.add(value);
+ runningSum += value;
+ runningSumOfSquares += value*value;
+ }
+
+ public float getMean() {
+ if (totalCount == 0) {
+ return Float.NaN;
+ }
+ return runningSum / totalCount;
+ }
+
+ public float getUnbiasedStdDev() {
+ if (totalCount <= 1) {
+ return Float.NaN;
+ }
+ float mean = getMean();
+ return (float)Math.sqrt((runningSumOfSquares - totalCount*mean*mean)/(totalCount - 1));
+ }
+}
Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/Histogram.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/Histogram.java?rev=897299&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/Histogram.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/Histogram.java Fri Jan 8 18:53:18 2010
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.stats;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeMap;
+
+/**
+ * Represents a histogram of values. This class uses a {@link Segmenter}
+ * to determine which bucket to place a given value into.
+ *
+ * Note that Histogram, by itself, is not synchronized.
+ * @param <B> Bucket type. Often String, since buckets are typically
+ * used for their toString() representation.
+ * @param <T> Type of value
+ */
+class Histogram<B, T> {
+ private Segmenter<B, T> segmenter;
+ private int[] counts;
+ protected int totalCount;
+
+ /**
+ * Interface to determine which bucket to place a value in.
+ *
+ * Segmenters should be immutable, so many histograms can re-use
+ * the same segmenter.
+ */
+ interface Segmenter<B, T> {
+ /** Number of buckets to use. */
+ int size();
+ /**
+ * Which bucket to place value in.
+ *
+ * @return Index of bucket for the value. At least 0 and less than size().
+ * @throws SegmenterException if value does not fit in a bucket.
+ */
+ int segment(T value);
+ /**
+ * Returns an iterator of buckets. The order of iteration
+ * is consistent with the segment numbers.
+ */
+ Iterator<B> getBuckets();
+ }
+
+ public static class SegmenterException extends RuntimeException {
+ public SegmenterException(String s) {
+ super(s);
+ }
+ }
+
+ public static class TreeMapSegmenter<T extends Comparable<T>>
+ implements Segmenter<String, T> {
+ private TreeMap<T, Integer> index = new TreeMap<T, Integer>();
+ public TreeMapSegmenter(SortedSet<T> leftEndpoints) {
+ if (leftEndpoints.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Endpoints must not be empty: " + leftEndpoints);
+ }
+ int i = 0;
+ for (T t : leftEndpoints) {
+ index.put(t, i++);
+ }
+ }
+
+ public int segment(T value) {
+ Map.Entry<T, Integer> e = index.floorEntry(value);
+ if (e == null) {
+ throw new SegmenterException("Could not find bucket for: " + value);
+ }
+ return e.getValue();
+ }
+
+ @Override
+ public int size() {
+ return index.size();
+ }
+
+ private String rangeAsString(T a, T b) {
+ return String.format("[%s,%s)", a, b == null ? "infinity" : b);
+ }
+
+ @Override
+ public Iterator<String> getBuckets() {
+ return new Iterator<String>() {
+ Iterator<T> it = index.keySet().iterator();
+ T cur = it.next(); // there's always at least one element
+
+ @Override
+ public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ @Override
+ public String next() {
+ T left = cur;
+ cur = it.hasNext() ? it.next() : null;
+ return rangeAsString(left, cur);
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+
+ }
+ };
+ }
+ }
+
+ /**
+ * Creates a histogram using the specified segmenter.
+ */
+ public Histogram(Segmenter<B, T> segmenter) {
+ this.segmenter = segmenter;
+ this.counts = new int[segmenter.size()];
+ }
+
+ /** Tallies a value in the histogram. */
+ public void add(T value) {
+ int i = segmenter.segment(value);
+ counts[i]++;
+ totalCount++;
+ }
+
+ /**
+ * Returns the underlying bucket values.
+ */
+ public int[] getHistogram() {
+ return counts;
+ }
+
+ /** Returns the total count of entries. */
+ public int getCount() {
+ return totalCount;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ for (Entry<B> e : entries()) {
+ if (!first) {
+ sb.append(";");
+ } else {
+ first = false;
+ }
+ sb.append(e.bucket).append("=").append(e.count);
+ }
+ return sb.toString();
+ }
+
+ static class Entry<B> {
+ public Entry(B bucket, int count) {
+ this.bucket = bucket;
+ this.count = count;
+ }
+ B bucket;
+ int count;
+ }
+
+ private class EntryIterator implements Iterable<Entry<B>>, Iterator<Entry<B>> {
+ int i = 0;
+ Iterator<B> bucketNameIterator = segmenter.getBuckets();
+
+ @Override
+ public Iterator<Entry<B>> iterator() {
+ return this;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return i < segmenter.size();
+ }
+
+ @Override
+ public Entry<B> next() {
+ return new Entry<B>(bucketNameIterator.next(), counts[i++]);
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+ public Iterable<Entry<B>> entries() {
+ return new EntryIterator();
+ }
+}
Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/StatsPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/StatsPlugin.java?rev=897299&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/StatsPlugin.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/StatsPlugin.java Fri Jan 8 18:53:18 2010
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.stats;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.avro.Protocol.Message;
+import org.apache.avro.ipc.RPCContext;
+import org.apache.avro.ipc.RPCPlugin;
+import org.apache.avro.ipc.stats.Histogram.Segmenter;
+import org.apache.avro.ipc.stats.Stopwatch.Ticks;
+
+/**
+ * Collects count and latency statistics about RPC calls. Keeps
+ * data for every method.
+ *
+ * This uses milliseconds as the standard unit of measure
+ * throughout the class, stored in floats.
+ */
+public class StatsPlugin extends RPCPlugin {
+ /** Static declaration of histogram buckets. */
+ static final Segmenter<String, Float> DEFAULT_SEGMENTER =
+ new Histogram.TreeMapSegmenter<Float>(new TreeSet<Float>(Arrays.asList(
+ 0f,
+ 25f,
+ 50f,
+ 75f,
+ 100f,
+ 200f,
+ 300f,
+ 500f,
+ 750f,
+ 1000f, // 1 second
+ 2000f,
+ 5000f,
+ 10000f,
+ 60000f, // 1 minute
+ 600000f)));
+
+ /** Per-method histograms.
+ * Must be accessed while holding a lock on methodTimings. */
+ Map<Message, FloatHistogram<?>> methodTimings =
+ new HashMap<Message, FloatHistogram<?>>();
+
+ /** RPCs in flight. */
+ ConcurrentMap<RPCContext, Stopwatch> activeRpcs =
+ new ConcurrentHashMap<RPCContext, Stopwatch>();
+ private Ticks ticks;
+
+ private Segmenter<?, Float> segmenter;
+
+ /** Construct a plugin with custom Ticks and Segmenter implementations. */
+ StatsPlugin(Ticks ticks, Segmenter<?, Float> segmenter) {
+ this.segmenter = segmenter;
+ this.ticks = ticks;
+ }
+
+ /** Construct a plugin with default (system) ticks, and default
+ * histogram segmentation. */
+ public StatsPlugin() {
+ this(Stopwatch.SYSTEM_TICKS, DEFAULT_SEGMENTER);
+ }
+
+ @Override
+ public void serverReceiveRequest(RPCContext context) {
+ Stopwatch t = new Stopwatch(ticks);
+ t.start();
+ this.activeRpcs.put(context, t);
+ }
+
+ @Override
+ public void serverSendResponse(RPCContext context) {
+ Stopwatch t = this.activeRpcs.remove(context);
+ t.stop();
+ publish(context, t);
+ }
+
+ /** Adds timing to the histograms. */
+ private void publish(RPCContext context, Stopwatch t) {
+ Message message = context.getMessage();
+ if (message == null) throw new IllegalArgumentException();
+ synchronized(methodTimings) {
+ FloatHistogram<?> h = methodTimings.get(context.getMessage());
+ if (h == null) {
+ h = createNewHistogram();
+ methodTimings.put(context.getMessage(), h);
+ }
+ h.add(nanosToMillis(t.elapsedNanos()));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private FloatHistogram<?> createNewHistogram() {
+ return new FloatHistogram(segmenter);
+ }
+
+ /** Converts nanoseconds to milliseconds. */
+ static float nanosToMillis(long elapsedNanos) {
+ return elapsedNanos / 1000000.0f;
+ }
+}
Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/StatsServlet.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/StatsServlet.java?rev=897299&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/StatsServlet.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/StatsServlet.java Fri Jan 8 18:53:18 2010
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.stats;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Map.Entry;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.avro.Protocol.Message;
+import org.apache.avro.ipc.RPCContext;
+
+/**
+ * Exposes information provided by a StatsPlugin as
+ * a web page.
+ *
+ * This class follows the same synchronization conventions
+ * as StatsPlugin, to avoid requiring StatsPlugin to serve
+ * a copy of the data.
+ */
+public class StatsServlet extends HttpServlet {
+ private final StatsPlugin statsPlugin;
+
+ public StatsServlet(StatsPlugin statsPlugin) {
+ this.statsPlugin = statsPlugin;
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException, IOException {
+ resp.setContentType("text/html");
+ writeStats(resp.getWriter());
+ }
+
+ void writeStats(Writer w) throws IOException {
+ w.append("<html><head><title>Avro RPC Stats</title></head>");
+ w.append("<body><h1>Avro RPC Stats</h1>");
+
+ w.append("<h2>Active RPCs</h2>");
+ w.append("<ol>");
+ for (Entry<RPCContext, Stopwatch> rpc : this.statsPlugin.activeRpcs.entrySet()) {
+ writeActiveRpc(w, rpc.getKey(), rpc.getValue());
+ }
+ w.append("</ol>");
+
+ w.append("<h2>Per-method Timing</h2>");
+ synchronized(this.statsPlugin.methodTimings) {
+ for (Entry<Message, FloatHistogram<?>> e :
+ this.statsPlugin.methodTimings.entrySet()) {
+ writeMethod(w, e.getKey(), e.getValue());
+ }
+ }
+ w.append("</body></html>");
+ }
+
+ private void writeActiveRpc(Writer w, RPCContext rpc, Stopwatch stopwatch) throws IOException {
+ w.append("<li>").append(rpc.getMessage().getName()).append(": ");
+ w.append(formatMillis(StatsPlugin.nanosToMillis(stopwatch.elapsedNanos())));
+ w.append("</li>");
+ }
+
+ private void writeMethod(Writer w, Message message, FloatHistogram<?> hist) throws IOException {
+ w.append("<h3>").append(message.getName()).append("</h3>");
+ w.append("<p>Number of calls: ");
+ w.append(Integer.toString(hist.getCount()));
+ w.append("</p><p>Average Duration: ");
+ w.append(formatMillis(hist.getMean()));
+ w.append("</p>");
+ w.append("</p><p>Std Dev: ");
+ w.append(formatMillis(hist.getUnbiasedStdDev()));
+ w.append("</p>");
+
+ w.append("<dl>");
+
+ for (Histogram.Entry<?> e : hist.entries()) {
+ w.append("<dt>");
+ w.append(e.bucket.toString());
+ w.append("</dt>");
+ w.append("<dd>").append(Integer.toString(e.count)).append("</dd>");
+ w.append("</dt>");
+ }
+ w.append("</dl>");
+ }
+
+ private CharSequence formatMillis(float millis) {
+ return String.format("%.0fms", millis);
+ }
+}
Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/Stopwatch.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/Stopwatch.java?rev=897299&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/Stopwatch.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/Stopwatch.java Fri Jan 8 18:53:18 2010
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.stats;
+
+/** Encapsulates the passing of time. */
+class Stopwatch {
+ /** Encapsulates ticking time sources. */
+ interface Ticks {
+ /**
+ * Returns a number of "ticks" in nanoseconds.
+ * This should be monotonically non-decreasing.
+ */
+ long ticks();
+ }
+
+ /** Default System time source. */
+ public final static Ticks SYSTEM_TICKS = new SystemTicks();
+
+ private Ticks ticks;
+ private long start;
+ private long elapsed = -1;
+ private boolean running;
+
+ public Stopwatch(Ticks ticks) {
+ this.ticks = ticks;
+ }
+
+ /** Returns seconds that have elapsed since start() */
+ public long elapsedNanos() {
+ if (running) {
+ return this.ticks.ticks() - start;
+ } else {
+ if (elapsed == -1) throw new IllegalStateException();
+ return elapsed;
+ }
+ }
+
+ /** Starts the stopwatch. */
+ public void start() {
+ if (running) throw new IllegalStateException();
+ start = ticks.ticks();
+ running = true;
+ }
+
+ /** Stops the stopwatch and calculates the elapsed time. */
+ public void stop() {
+ if (!running) throw new IllegalStateException();
+ elapsed = ticks.ticks() - start;
+ running = false;
+ }
+
+ /** Implementation of Ticks using System.nanoTime(). */
+ private static class SystemTicks implements Ticks {
+ @Override
+ public long ticks() {
+ return System.nanoTime();
+ }
+ }
+}
Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/package.html
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/package.html?rev=897299&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/package.html (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/stats/package.html Fri Jan 8 18:53:18 2010
@@ -0,0 +1,23 @@
+<html>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<body>
+Utilities to collect and display IPC statistics.
+</body>
+</html>
Added: hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/FakeTicks.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/FakeTicks.java?rev=897299&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/FakeTicks.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/FakeTicks.java Fri Jan 8 18:53:18 2010
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.stats;
+
+import org.apache.avro.ipc.stats.Stopwatch.Ticks;
+
+/** Implements Ticks with manual time-winding. */
+class FakeTicks implements Ticks {
+ long time = 0;
+
+ @Override
+ public long ticks() {
+ return time;
+ }
+
+ public void passTime(long nanos) {
+ time += nanos;
+ }
+
+}
Added: hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/StatsPluginOverhead.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/StatsPluginOverhead.java?rev=897299&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/StatsPluginOverhead.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/StatsPluginOverhead.java Fri Jan 8 18:53:18 2010
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.stats;
+
+import java.io.IOException;
+import java.net.URL;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.Protocol.Message;
+import org.apache.avro.generic.GenericRequestor;
+import org.apache.avro.generic.GenericResponder;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.HttpTransceiver;
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.Transceiver;
+
+/**
+ * Naively measures overhead of using the stats plugin.
+ *
+ * The API used is the generic one.
+ * The protocol is the "null" protocol: null is sent
+ * and returned.
+ */
+public class StatsPluginOverhead {
+ /** Number of RPCs per iteration. */
+ private static final int COUNT = 100000;
+ private static final Protocol NULL_PROTOCOL = Protocol.parse(
+ "{\"protocol\": \"null\", "
+ + "\"messages\": { \"null\": {"
+ + " \"request\": [], "
+ + " \"response\": \"null\"} } }");
+
+ private static class IdentityResponder extends GenericResponder {
+ public IdentityResponder(Protocol local) {
+ super(local);
+ }
+
+ @Override
+ public Object respond(Message message, Object request)
+ throws AvroRemoteException {
+ return request;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ double with = sendRpcs(true)/1000000000.0;
+ double without = sendRpcs(false)/1000000000.0;
+
+ System.out.println(String.format(
+ "Overhead: %f%%. RPC/s: %f (with) vs %f (without). " +
+ "RPC time (ms): %f vs %f",
+ 100*(with - without)/(without),
+ COUNT/with,
+ COUNT/without,
+ 1000*with/COUNT,
+ 1000*without/COUNT));
+ }
+
+ /** Sends RPCs and returns nanos elapsed. */
+ private static long sendRpcs(boolean withPlugin) throws IOException {
+ HttpServer server = createServer(withPlugin);
+ Transceiver t =
+ new HttpTransceiver(new URL("http://127.0.0.1:"+server.getPort()+"/"));
+ GenericRequestor requestor = new GenericRequestor(NULL_PROTOCOL, t);
+
+ long now = System.nanoTime();
+ for (int i = 0; i < COUNT; ++i) {
+ requestor.request("null", null);
+ }
+ long elapsed = System.nanoTime() - now;
+ t.close();
+ server.close();
+ return elapsed;
+ }
+
+ /** Starts an Avro server. */
+ private static HttpServer createServer(boolean withPlugin)
+ throws IOException {
+ Responder r = new IdentityResponder(NULL_PROTOCOL);
+ if (withPlugin) {
+ r.addRPCPlugin(new StatsPlugin());
+ }
+ // Start Avro server
+ return new HttpServer(r, 0);
+ }
+
+}
Added: hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestHistogram.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestHistogram.java?rev=897299&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestHistogram.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestHistogram.java Fri Jan 8 18:53:18 2010
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.stats;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.avro.ipc.stats.Histogram.Entry;
+import org.apache.avro.ipc.stats.Histogram.Segmenter;
+import org.junit.Test;
+
+public class TestHistogram {
+
+ @Test
+ public void testBasicOperation() {
+ Segmenter<String, Integer> s = new Histogram.TreeMapSegmenter<Integer>(
+ new TreeSet<Integer>(Arrays.asList(0, 1, 2, 4, 8, 16)));
+
+ Histogram<String, Integer> h = new Histogram<String, Integer>(s);
+
+ for(int i = 0; i < 20; ++i) {
+ h.add(i);
+ }
+ assertEquals(20, h.getCount());
+ assertArrayEquals(new int[] { 1, 1, 2, 4, 8, 4 }, h.getHistogram());
+
+ assertEquals("[0,1)=1;[1,2)=1;[2,4)=2;[4,8)=4;[8,16)=8;[16,infinity)=4", h.toString());
+
+ List<Entry<String>> entries = new ArrayList<Entry<String>>();
+ for (Entry<String> entry : h.entries()) {
+ entries.add(entry);
+ }
+ assertEquals("[0,1)", entries.get(0).bucket);
+ assertEquals(4, entries.get(5).count);
+ assertEquals(6, entries.size());
+ }
+
+ @Test(expected=Histogram.SegmenterException.class)
+ public void testBadValue() {
+ Segmenter<String, Long> s = new Histogram.TreeMapSegmenter<Long>(
+ new TreeSet<Long>(Arrays.asList(0L, 1L, 2L, 4L, 8L, 16L)));
+
+ Histogram<String, Long> h = new Histogram<String, Long>(s);
+ h.add(-1L);
+ }
+
+ /** Only has one bucket */
+ static class SingleBucketSegmenter implements Segmenter<String, Float >{
+ @Override
+ public Iterator<String> getBuckets() {
+ return Arrays.asList("X").iterator();
+ }
+
+ @Override
+ public int segment(Float value) { return 0; }
+
+ @Override
+ public int size() { return 1; }
+ }
+
+ @Test
+ public void testFloatHistogram() {
+ FloatHistogram<String> h = new FloatHistogram<String>(new SingleBucketSegmenter());
+ h.add(12.0f);
+ h.add(10.0f);
+ h.add(20.0f);
+
+ assertEquals(3, h.getCount());
+ assertEquals(14.0f, h.getMean(), 0.0001);
+ assertEquals(5.291f, h.getUnbiasedStdDev(), 0.001);
+ }
+
+}
Added: hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestStatsPluginAndServlet.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestStatsPluginAndServlet.java?rev=897299&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestStatsPluginAndServlet.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestStatsPluginAndServlet.java Fri Jan 8 18:53:18 2010
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.stats;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.Protocol.Message;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRequestor;
+import org.apache.avro.generic.GenericResponder;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.LocalTransceiver;
+import org.apache.avro.ipc.RPCContext;
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.Transceiver;
+import org.junit.Test;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.mortbay.log.Log;
+
+public class TestStatsPluginAndServlet {
+ Protocol protocol = Protocol.parse("" + "{\"protocol\": \"Minimal\", "
+ + "\"messages\": { \"m\": {"
+ + " \"request\": [{\"name\": \"x\", \"type\": \"int\"}], "
+ + " \"response\": \"int\"} } }");
+ Message message = protocol.getMessages().get("m");
+
+ private static final long MS = 1000*1000L;
+
+ /** Returns an HTML string. */
+ private String generateServletResponse(StatsPlugin statsPlugin)
+ throws IOException {
+ StatsServlet servlet = new StatsServlet(statsPlugin);
+ StringWriter w = new StringWriter();
+ servlet.writeStats(w);
+ String o = w.toString();
+ return o;
+ }
+
+ /** Expects 0 and returns 1. */
+ static class TestResponder extends GenericResponder {
+ public TestResponder(Protocol local) {
+ super(local);
+ }
+
+ @Override
+ public Object respond(Message message, Object request)
+ throws AvroRemoteException {
+ assertEquals(0, ((GenericRecord) request).get("x"));
+ return 1;
+ }
+
+ }
+
+ private void makeRequest(Transceiver t) throws IOException {
+ GenericRecord params = new GenericData.Record(protocol.getMessages().get(
+ "m").getRequest());
+ params.put("x", 0);
+ GenericRequestor r = new GenericRequestor(protocol, t);
+ assertEquals(1, r.request("m", params));
+ }
+
+ @Test
+ public void testFullServerPath() throws IOException {
+ Responder r = new TestResponder(protocol);
+ StatsPlugin statsPlugin = new StatsPlugin();
+ r.addRPCPlugin(statsPlugin);
+ Transceiver t = new LocalTransceiver(r);
+
+ for (int i = 0; i < 10; ++i) {
+ makeRequest(t);
+ }
+
+ String o = generateServletResponse(statsPlugin);
+ assertTrue(o.contains("Number of calls: 10"));
+ }
+
+ @Test
+ public void testMultipleRPCs() throws IOException {
+ FakeTicks t = new FakeTicks();
+ StatsPlugin statsPlugin = new StatsPlugin(t, StatsPlugin.DEFAULT_SEGMENTER);
+ RPCContext context1 = makeContext();
+ RPCContext context2 = makeContext();
+ statsPlugin.serverReceiveRequest(context1);
+ t.passTime(100*MS); // first takes 100ms
+ statsPlugin.serverReceiveRequest(context2);
+ String r = generateServletResponse(statsPlugin);
+ // Check in progress RPCs
+ assertTrue(r.contains("m: 0ms"));
+ assertTrue(r.contains("m: 100ms"));
+ statsPlugin.serverSendResponse(context1);
+ t.passTime(900*MS); // second takes 900ms
+ statsPlugin.serverSendResponse(context2);
+
+ r = generateServletResponse(statsPlugin);
+ assertTrue(r.contains("Average Duration: 500ms"));
+ }
+
+ private RPCContext makeContext() {
+ RPCContext context = new RPCContext();
+ context.setMessage(message);
+ return context;
+ }
+
+ /** Sleeps as requested. */
+ private static class SleepyResponder extends GenericResponder {
+ public SleepyResponder(Protocol local) {
+ super(local);
+ }
+
+ @Override
+ public Object respond(Message message, Object request)
+ throws AvroRemoteException {
+ try {
+ Thread.sleep((Long)((GenericRecord)request).get("millis"));
+ } catch (InterruptedException e) {
+ throw new AvroRemoteException(e);
+ }
+ return null;
+ }
+ }
+
+ /**
+ * Demo program for using RPC stats. avroj can be used (as below)
+ * to trigger RPCs.
+ * <pre>
+ * java -jar build/avroj-1.2.0-dev.jar rpcsend '{"protocol":"sleepy","namespace":null,"types":[],"messages":{"sleep":{"request":[{"name":"millis","type":"long"}],"response":"null"}}}' sleep localhost 7002 '{"millis": 20000}'
+ * </pre>
+ * @param args
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ if (args.length == 0) {
+ args = new String[] { "7002", "7003" };
+ }
+ Protocol protocol = Protocol.parse("{\"protocol\": \"sleepy\", "
+ + "\"messages\": { \"sleep\": {"
+ + " \"request\": [{\"name\": \"millis\", \"type\": \"long\"}], "
+ + " \"response\": \"null\"} } }");
+ Log.info("Using protocol: " + protocol.toString());
+ Responder r = new SleepyResponder(protocol);
+ StatsPlugin p = new StatsPlugin();
+ r.addRPCPlugin(p);
+
+ // Start Avro server
+ new HttpServer(r, Integer.parseInt(args[0]));
+
+ // Ideally we could use the same Jetty server
+ Server httpServer = new Server(Integer.parseInt(args[1]));
+ new Context(httpServer, "/").addServlet(
+ new ServletHolder(new StatsServlet(p)), "/*");
+ httpServer.start();
+ while(true) {
+ Thread.sleep(60*1000);
+ }
+ }
+}
Added: hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestStopwatch.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestStopwatch.java?rev=897299&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestStopwatch.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/ipc/stats/TestStopwatch.java Fri Jan 8 18:53:18 2010
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc.stats;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class TestStopwatch {
+ @Test
+ public void testNormal() {
+ FakeTicks f = new FakeTicks();
+ Stopwatch s = new Stopwatch(f);
+ f.passTime(10);
+ s.start();
+ f.passTime(20);
+ assertEquals(20, s.elapsedNanos());
+ f.passTime(40);
+ s.stop();
+ f.passTime(80);
+ assertEquals(60, s.elapsedNanos());
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testNotStarted1() {
+ FakeTicks f = new FakeTicks();
+ Stopwatch s = new Stopwatch(f);
+ s.elapsedNanos();
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testNotStarted2() {
+ FakeTicks f = new FakeTicks();
+ Stopwatch s = new Stopwatch(f);
+ s.stop();
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testTwiceStarted() {
+ FakeTicks f = new FakeTicks();
+ Stopwatch s = new Stopwatch(f);
+ s.start();
+ s.start();
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testTwiceStopped() {
+ FakeTicks f = new FakeTicks();
+ Stopwatch s = new Stopwatch(f);
+ s.start();
+ s.stop();
+ s.stop();
+ }
+
+ @Test
+ public void testSystemStopwatch() {
+ Stopwatch s = new Stopwatch(Stopwatch.SYSTEM_TICKS);
+ s.start();
+ s.stop();
+ assertTrue(s.elapsedNanos() >= 0);
+ }
+
+}