You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/01/28 20:16:28 UTC

[2/7] KAFKA-1227 New producer!

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/utils/KafkaThread.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/utils/KafkaThread.java b/clients/src/main/java/kafka/common/utils/KafkaThread.java
new file mode 100644
index 0000000..f830aba
--- /dev/null
+++ b/clients/src/main/java/kafka/common/utils/KafkaThread.java
@@ -0,0 +1,18 @@
+package kafka.common.utils;
+
+/**
+ * A wrapper for Thread that sets things up nicely
+ */
+public class KafkaThread extends Thread {
+
+    public KafkaThread(String name, Runnable runnable, boolean daemon) {
+        super(runnable, name);
+        setDaemon(daemon);
+        setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+            public void uncaughtException(Thread t, Throwable e) {
+                e.printStackTrace();
+            }
+        });
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/utils/SystemTime.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/utils/SystemTime.java b/clients/src/main/java/kafka/common/utils/SystemTime.java
new file mode 100644
index 0000000..c8ca09c
--- /dev/null
+++ b/clients/src/main/java/kafka/common/utils/SystemTime.java
@@ -0,0 +1,26 @@
+package kafka.common.utils;
+
+/**
+ * A time implementation that uses the system clock and sleep call
+ */
+public class SystemTime implements Time {
+
+    @Override
+    public long milliseconds() {
+        return System.currentTimeMillis();
+    }
+
+    public long nanoseconds() {
+        return System.nanoTime();
+    }
+
+    @Override
+    public void sleep(long ms) {
+        try {
+            Thread.sleep(ms);
+        } catch (InterruptedException e) {
+            // no stress
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/utils/Time.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/utils/Time.java b/clients/src/main/java/kafka/common/utils/Time.java
new file mode 100644
index 0000000..390d16f
--- /dev/null
+++ b/clients/src/main/java/kafka/common/utils/Time.java
@@ -0,0 +1,23 @@
+package kafka.common.utils;
+
+/**
+ * An interface abstracting the clock to use in unit testing classes that make use of clock time
+ */
+public interface Time {
+
+    /**
+     * The current time in milliseconds
+     */
+    public long milliseconds();
+
+    /**
+     * The current time in nanoseconds
+     */
+    public long nanoseconds();
+
+    /**
+     * Sleep for the given number of milliseconds
+     */
+    public void sleep(long ms);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/utils/Utils.java b/clients/src/main/java/kafka/common/utils/Utils.java
new file mode 100644
index 0000000..f132771
--- /dev/null
+++ b/clients/src/main/java/kafka/common/utils/Utils.java
@@ -0,0 +1,230 @@
+package kafka.common.utils;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+
+import kafka.common.KafkaException;
+
+public class Utils {
+
+    /**
+     * Turn the given UTF8 byte array into a string
+     * 
+     * @param bytes The byte array
+     * @return The string
+     */
+    public static String utf8(byte[] bytes) {
+        try {
+            return new String(bytes, "UTF8");
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException("This shouldn't happen.", e);
+        }
+    }
+
+    /**
+     * Turn a string into a utf8 byte[]
+     * 
+     * @param string The string
+     * @return The byte[]
+     */
+    public static byte[] utf8(String string) {
+        try {
+            return string.getBytes("UTF8");
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException("This shouldn't happen.", e);
+        }
+    }
+
+    /**
+     * Read an unsigned integer from the current position in the buffer, incrementing the position by 4 bytes
+     * 
+     * @param buffer The buffer to read from
+     * @return The integer read, as a long to avoid signedness
+     */
+    public static long readUnsignedInt(ByteBuffer buffer) {
+        return buffer.getInt() & 0xffffffffL;
+    }
+
+    /**
+     * Read an unsigned integer from the given position without modifying the buffers position
+     * 
+     * @param buffer the buffer to read from
+     * @param index the index from which to read the integer
+     * @return The integer read, as a long to avoid signedness
+     */
+    public static long readUnsignedInt(ByteBuffer buffer, int index) {
+        return buffer.getInt(index) & 0xffffffffL;
+    }
+
+    /**
+     * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
+     * 
+     * @param buffer The buffer to write to
+     * @param value The value to write
+     */
+    public static void writetUnsignedInt(ByteBuffer buffer, long value) {
+        buffer.putInt((int) (value & 0xffffffffL));
+    }
+
+    /**
+     * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
+     * 
+     * @param buffer The buffer to write to
+     * @param index The position in the buffer at which to begin writing
+     * @param value The value to write
+     */
+    public static void writeUnsignedInt(ByteBuffer buffer, int index, long value) {
+        buffer.putInt(index, (int) (value & 0xffffffffL));
+    }
+
+    /**
+     * Compute the CRC32 of the byte array
+     * 
+     * @param bytes The array to compute the checksum for
+     * @return The CRC32
+     */
+    public static long crc32(byte[] bytes) {
+        return crc32(bytes, 0, bytes.length);
+    }
+
+    /**
+     * Compute the CRC32 of the segment of the byte array given by the specificed size and offset
+     * 
+     * @param bytes The bytes to checksum
+     * @param offset the offset at which to begin checksumming
+     * @param size the number of bytes to checksum
+     * @return The CRC32
+     */
+    public static long crc32(byte[] bytes, int offset, int size) {
+        Crc32 crc = new Crc32();
+        crc.update(bytes, offset, size);
+        return crc.getValue();
+    }
+
+    /**
+     * Get the absolute value of the given number. If the number is Int.MinValue return 0. This is different from
+     * java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!).
+     */
+    public static int abs(int n) {
+        return n & 0x7fffffff;
+    }
+
+    /**
+     * Get the length for UTF8-encoding a string without encoding it first
+     * 
+     * @param s The string to calculate the length for
+     * @return The length when serialized
+     */
+    public static int utf8Length(CharSequence s) {
+        int count = 0;
+        for (int i = 0, len = s.length(); i < len; i++) {
+            char ch = s.charAt(i);
+            if (ch <= 0x7F) {
+                count++;
+            } else if (ch <= 0x7FF) {
+                count += 2;
+            } else if (Character.isHighSurrogate(ch)) {
+                count += 4;
+                ++i;
+            } else {
+                count += 3;
+            }
+        }
+        return count;
+    }
+
+    /**
+     * Read the given byte buffer into a byte array
+     */
+    public static byte[] toArray(ByteBuffer buffer) {
+        return toArray(buffer, 0, buffer.limit());
+    }
+
+    /**
+     * Read a byte array from the given offset and size in the buffer
+     */
+    public static byte[] toArray(ByteBuffer buffer, int offset, int size) {
+        byte[] dest = new byte[size];
+        if (buffer.hasArray()) {
+            System.arraycopy(buffer.array(), buffer.arrayOffset() + offset, dest, 0, size);
+        } else {
+            int pos = buffer.position();
+            buffer.get(dest);
+            buffer.position(pos);
+        }
+        return dest;
+    }
+
+    /**
+     * Check that the parameter t is not null
+     * 
+     * @param t The object to check
+     * @return t if it isn't null
+     * @throws NullPointerException if t is null.
+     */
+    public static <T> T notNull(T t) {
+        if (t == null)
+            throw new NullPointerException();
+        else
+            return t;
+    }
+
+    /**
+     * Instantiate the class
+     */
+    public static Object newInstance(Class<?> c) {
+        try {
+            return c.newInstance();
+        } catch (IllegalAccessException e) {
+            throw new KafkaException("Could not instantiate class " + c.getName(), e);
+        } catch (InstantiationException e) {
+            throw new KafkaException("Could not instantiate class " + c.getName() + " Does it have a public no-argument constructor?", e);
+        }
+    }
+
+    /**
+     * Generates 32 bit murmur2 hash from byte array
+     * @param data byte array to hash
+     * @return 32 bit hash of the given array
+     */
+    public static int murmur2(final byte[] data) {
+        int length = data.length;
+        int seed = 0x9747b28c;
+        // 'm' and 'r' are mixing constants generated offline.
+        // They're not really 'magic', they just happen to work well.
+        final int m = 0x5bd1e995;
+        final int r = 24;
+
+        // Initialize the hash to a random value
+        int h = seed ^ length;
+        int length4 = length / 4;
+
+        for (int i = 0; i < length4; i++) {
+            final int i4 = i * 4;
+            int k = (data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 + 2] & 0xff) << 16) + ((data[i4 + 3] & 0xff) << 24);
+            k *= m;
+            k ^= k >>> r;
+            k *= m;
+            h *= m;
+            h ^= k;
+        }
+
+        // Handle the last few bytes of the input array
+        switch (length % 4) {
+            case 3:
+                h ^= (data[(length & ~3) + 2] & 0xff) << 16;
+            case 2:
+                h ^= (data[(length & ~3) + 1] & 0xff) << 8;
+            case 1:
+                h ^= (data[length & ~3] & 0xff);
+                h *= m;
+        }
+
+        h ^= h >>> 13;
+        h *= m;
+        h ^= h >>> 15;
+
+        return h;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/clients/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/common/network/SelectorTest.java b/clients/src/test/java/kafka/clients/common/network/SelectorTest.java
new file mode 100644
index 0000000..68bc9ee
--- /dev/null
+++ b/clients/src/test/java/kafka/clients/common/network/SelectorTest.java
@@ -0,0 +1,292 @@
+package kafka.clients.common.network;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.UnresolvedAddressException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import kafka.common.network.NetworkReceive;
+import kafka.common.network.NetworkSend;
+import kafka.common.network.Selectable;
+import kafka.common.network.Selector;
+import kafka.common.utils.Utils;
+import kafka.test.TestUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * A set of tests for the selector. These use a test harness that runs a simple socket server that echos back responses.
+ */
+public class SelectorTest {
+
+    private static final List<NetworkSend> EMPTY = new ArrayList<NetworkSend>();
+    private static final int BUFFER_SIZE = 4 * 1024;
+
+    private EchoServer server;
+    private Selectable selector;
+
+    @Before
+    public void setup() throws Exception {
+        this.server = new EchoServer();
+        this.server.start();
+        this.selector = new Selector();
+    }
+
+    @After
+    public void teardown() throws Exception {
+        this.selector.close();
+        this.server.close();
+    }
+
+    /**
+     * Validate that when the server disconnects, a client send ends up with that node in the disconnected list.
+     */
+    @Test
+    public void testServerDisconnect() throws Exception {
+        int node = 0;
+
+        // connect and do a simple request
+        blockingConnect(node);
+        assertEquals("hello", blockingRequest(node, "hello"));
+
+        // disconnect
+        this.server.closeConnections();
+        while (!selector.disconnected().contains(node))
+            selector.poll(1000L, EMPTY);
+
+        // reconnect and do another request
+        blockingConnect(node);
+        assertEquals("hello", blockingRequest(node, "hello"));
+    }
+
+    /**
+     * Validate that the client can intentionally disconnect and reconnect
+     */
+    @Test
+    public void testClientDisconnect() throws Exception {
+        int node = 0;
+        blockingConnect(node);
+        selector.disconnect(node);
+        selector.poll(10, asList(createSend(node, "hello1")));
+        assertEquals("Request should not have succeeded", 0, selector.completedSends().size());
+        assertEquals("There should be a disconnect", 1, selector.disconnected().size());
+        assertTrue("The disconnect should be from our node", selector.disconnected().contains(node));
+        blockingConnect(node);
+        assertEquals("hello2", blockingRequest(node, "hello2"));
+    }
+
+    /**
+     * Sending a request with one already in flight should result in an exception
+     */
+    @Test(expected = IllegalStateException.class)
+    public void testCantSendWithInProgress() throws Exception {
+        int node = 0;
+        blockingConnect(node);
+        selector.poll(1000L, asList(createSend(node, "test1"), createSend(node, "test2")));
+    }
+
+    /**
+     * Sending a request to a node without an existing connection should result in an exception
+     */
+    @Test(expected = IllegalStateException.class)
+    public void testCantSendWithoutConnecting() throws Exception {
+        selector.poll(1000L, asList(createSend(0, "test")));
+    }
+
+    /**
+     * Sending a request to a node with a bad hostname should result in an exception during connect
+     */
+    @Test(expected = UnresolvedAddressException.class)
+    public void testNoRouteToHost() throws Exception {
+        selector.connect(0, new InetSocketAddress("asdf.asdf.dsc", server.port), BUFFER_SIZE, BUFFER_SIZE);
+    }
+
+    /**
+     * Sending a request to a node not listening on that port should result in disconnection
+     */
+    @Test
+    public void testConnectionRefused() throws Exception {
+        int node = 0;
+        selector.connect(node, new InetSocketAddress("localhost", TestUtils.choosePort()), BUFFER_SIZE, BUFFER_SIZE);
+        while (selector.disconnected().contains(node))
+            selector.poll(1000L, EMPTY);
+    }
+
+    /**
+     * Send multiple requests to several connections in parallel. Validate that responses are received in the order that
+     * requests were sent.
+     */
+    @Test
+    public void testNormalOperation() throws Exception {
+        int conns = 5;
+        int reqs = 500;
+
+        // create connections
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        for (int i = 0; i < conns; i++)
+            selector.connect(i, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        // send echo requests and receive responses
+        int[] requests = new int[conns];
+        int[] responses = new int[conns];
+        int responseCount = 0;
+        List<NetworkSend> sends = new ArrayList<NetworkSend>();
+        for (int i = 0; i < conns; i++)
+            sends.add(createSend(i, i + "-" + 0));
+
+        // loop until we complete all requests
+        while (responseCount < conns * reqs) {
+            // do the i/o
+            selector.poll(0L, sends);
+
+            assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size());
+
+            // handle any responses we may have gotten
+            for (NetworkReceive receive : selector.completedReceives()) {
+                String[] pieces = asString(receive).split("-");
+                assertEquals("Should be in the form 'conn-counter'", 2, pieces.length);
+                assertEquals("Check the source", receive.source(), Integer.parseInt(pieces[0]));
+                assertEquals("Check that the receive has kindly been rewound", 0, receive.payload().position());
+                assertEquals("Check the request counter", responses[receive.source()], Integer.parseInt(pieces[1]));
+                responses[receive.source()]++; // increment the expected counter
+                responseCount++;
+            }
+
+            // prepare new sends for the next round
+            sends.clear();
+            for (NetworkSend send : selector.completedSends()) {
+                int dest = send.destination();
+                requests[dest]++;
+                if (requests[dest] < reqs)
+                    sends.add(createSend(dest, dest + "-" + requests[dest]));
+            }
+        }
+    }
+
+    /**
+     * Validate that we can send and receive a message larger than the receive and send buffer size
+     */
+    @Test
+    public void testSendLargeRequest() throws Exception {
+        int node = 0;
+        blockingConnect(node);
+        String big = TestUtils.randomString(10 * BUFFER_SIZE);
+        assertEquals(big, blockingRequest(node, big));
+    }
+
+    /**
+     * Test sending an empty string
+     */
+    @Test
+    public void testEmptyRequest() throws Exception {
+        int node = 0;
+        blockingConnect(node);
+        assertEquals("", blockingRequest(node, ""));
+    }
+
+    private String blockingRequest(int node, String s) throws IOException {
+        selector.poll(1000L, asList(createSend(node, s)));
+        while (true) {
+            selector.poll(1000L, EMPTY);
+            for (NetworkReceive receive : selector.completedReceives())
+                if (receive.source() == node)
+                    return asString(receive);
+        }
+    }
+
+    /* connect and wait for the connection to complete */
+    private void blockingConnect(int node) throws IOException {
+        selector.connect(node, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
+        while (!selector.connected().contains(node))
+            selector.poll(10000L, EMPTY);
+    }
+
+    private NetworkSend createSend(int node, String s) {
+        return new NetworkSend(node, ByteBuffer.wrap(s.getBytes()));
+    }
+
+    private String asString(NetworkReceive receive) {
+        return new String(Utils.toArray(receive.payload()));
+    }
+
+    /**
+     * A simple server that takes size delimited byte arrays and just echos them back to the sender.
+     */
+    static class EchoServer extends Thread {
+        public final int port;
+        private final ServerSocket serverSocket;
+        private final List<Thread> threads;
+        private final List<Socket> sockets;
+
+        public EchoServer() throws Exception {
+            this.port = TestUtils.choosePort();
+            this.serverSocket = new ServerSocket(port);
+            this.threads = Collections.synchronizedList(new ArrayList<Thread>());
+            this.sockets = Collections.synchronizedList(new ArrayList<Socket>());
+        }
+
+        public void run() {
+            try {
+                while (true) {
+                    final Socket socket = serverSocket.accept();
+                    sockets.add(socket);
+                    Thread thread = new Thread() {
+                        public void run() {
+                            try {
+                                DataInputStream input = new DataInputStream(socket.getInputStream());
+                                DataOutputStream output = new DataOutputStream(socket.getOutputStream());
+                                while (socket.isConnected() && !socket.isClosed()) {
+                                    int size = input.readInt();
+                                    byte[] bytes = new byte[size];
+                                    input.readFully(bytes);
+                                    output.writeInt(size);
+                                    output.write(bytes);
+                                    output.flush();
+                                }
+                            } catch (IOException e) {
+                                // ignore
+                            } finally {
+                                try {
+                                    socket.close();
+                                } catch (IOException e) {
+                                    // ignore
+                                }
+                            }
+                        }
+                    };
+                    thread.start();
+                    threads.add(thread);
+                }
+            } catch (IOException e) {
+                // ignore
+            }
+        }
+
+        public void closeConnections() throws IOException {
+            for (Socket socket : sockets)
+                socket.close();
+        }
+
+        public void close() throws IOException, InterruptedException {
+            this.serverSocket.close();
+            closeConnections();
+            for (Thread t : threads)
+                t.join();
+            join();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/clients/producer/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/BufferPoolTest.java b/clients/src/test/java/kafka/clients/producer/BufferPoolTest.java
new file mode 100644
index 0000000..70603c4
--- /dev/null
+++ b/clients/src/test/java/kafka/clients/producer/BufferPoolTest.java
@@ -0,0 +1,170 @@
+package kafka.clients.producer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import kafka.clients.producer.internals.BufferPool;
+import kafka.test.TestUtils;
+
+import org.junit.Test;
+
+public class BufferPoolTest {
+
+    /**
+     * Test the simple non-blocking allocation paths
+     */
+    @Test
+    public void testSimple() throws Exception {
+        int totalMemory = 64 * 1024;
+        int size = 1024;
+        BufferPool pool = new BufferPool(totalMemory, size, false);
+        ByteBuffer buffer = pool.allocate(size);
+        assertEquals("Buffer size should equal requested size.", size, buffer.limit());
+        assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory());
+        assertEquals("Available memory should have shrunk", totalMemory - size, pool.availableMemory());
+        buffer.putInt(1);
+        buffer.flip();
+        pool.deallocate(buffer);
+        assertEquals("All memory should be available", totalMemory, pool.availableMemory());
+        assertEquals("But now some is on the free list", totalMemory - size, pool.unallocatedMemory());
+        buffer = pool.allocate(size);
+        assertEquals("Recycled buffer should be cleared.", 0, buffer.position());
+        assertEquals("Recycled buffer should be cleared.", buffer.capacity(), buffer.limit());
+        pool.deallocate(buffer);
+        assertEquals("All memory should be available", totalMemory, pool.availableMemory());
+        assertEquals("Still a single buffer on the free list", totalMemory - size, pool.unallocatedMemory());
+        buffer = pool.allocate(2 * size);
+        pool.deallocate(buffer);
+        assertEquals("All memory should be available", totalMemory, pool.availableMemory());
+        assertEquals("Non-standard size didn't go to the free list.", totalMemory - size, pool.unallocatedMemory());
+    }
+
+    /**
+     * Test that we cannot try to allocate more memory then we have in the whole pool
+     */
+    @Test(expected = IllegalArgumentException.class)
+    public void testCantAllocateMoreMemoryThanWeHave() throws Exception {
+        BufferPool pool = new BufferPool(1024, 512, true);
+        ByteBuffer buffer = pool.allocate(1024);
+        assertEquals(1024, buffer.limit());
+        pool.deallocate(buffer);
+        buffer = pool.allocate(1025);
+    }
+
+    @Test
+    public void testNonblockingMode() throws Exception {
+        BufferPool pool = new BufferPool(2, 1, false);
+        pool.allocate(1);
+        try {
+            pool.allocate(2);
+            fail("The buffer allocated more than it has!");
+        } catch (BufferExhaustedException e) {
+            // this is good
+        }
+    }
+
+    /**
+     * Test that delayed allocation blocks
+     */
+    @Test
+    public void testDelayedAllocation() throws Exception {
+        BufferPool pool = new BufferPool(5 * 1024, 1024, true);
+        ByteBuffer buffer = pool.allocate(1024);
+        CountDownLatch doDealloc = asyncDeallocate(pool, buffer);
+        CountDownLatch allocation = asyncAllocate(pool, 5 * 1024);
+        assertEquals("Allocation shouldn't have happened yet, waiting on memory.", 1, allocation.getCount());
+        doDealloc.countDown(); // return the memory
+        allocation.await();
+    }
+
+    private CountDownLatch asyncDeallocate(final BufferPool pool, final ByteBuffer buffer) {
+        final CountDownLatch latch = new CountDownLatch(1);
+        new Thread() {
+            public void run() {
+                try {
+                    latch.await();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                pool.deallocate(buffer);
+            }
+        }.start();
+        return latch;
+    }
+
+    private CountDownLatch asyncAllocate(final BufferPool pool, final int size) {
+        final CountDownLatch completed = new CountDownLatch(1);
+        new Thread() {
+            public void run() {
+                try {
+                    pool.allocate(size);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                } finally {
+                    completed.countDown();
+                }
+            }
+        }.start();
+        return completed;
+    }
+
+    /**
+     * This test creates lots of threads that hammer on the pool
+     */
+    @Test
+    public void testStressfulSituation() throws Exception {
+        int numThreads = 10;
+        final int iterations = 50000;
+        final int poolableSize = 1024;
+        final int totalMemory = numThreads / 2 * poolableSize;
+        final BufferPool pool = new BufferPool(totalMemory, poolableSize, true);
+        List<StressTestThread> threads = new ArrayList<StressTestThread>();
+        for (int i = 0; i < numThreads; i++)
+            threads.add(new StressTestThread(pool, iterations));
+        for (StressTestThread thread : threads)
+            thread.start();
+        for (StressTestThread thread : threads)
+            thread.join();
+        for (StressTestThread thread : threads)
+            assertTrue("Thread should have completed all iterations successfully.", thread.success.get());
+        assertEquals(totalMemory, pool.availableMemory());
+    }
+
+    public static class StressTestThread extends Thread {
+        private final int iterations;
+        private final BufferPool pool;
+        public final AtomicBoolean success = new AtomicBoolean(false);
+
+        public StressTestThread(BufferPool pool, int iterations) {
+            this.iterations = iterations;
+            this.pool = pool;
+        }
+
+        public void run() {
+            try {
+                for (int i = 0; i < iterations; i++) {
+                    int size;
+                    if (TestUtils.random.nextBoolean())
+                        // allocate poolable size
+                        size = pool.poolableSize();
+                    else
+                        // allocate a random size
+                        size = TestUtils.random.nextInt((int) pool.totalMemory());
+                    ByteBuffer buffer = pool.allocate(size);
+                    pool.deallocate(buffer);
+                }
+                success.set(true);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/clients/producer/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/kafka/clients/producer/MetadataTest.java
new file mode 100644
index 0000000..68e4bd7
--- /dev/null
+++ b/clients/src/test/java/kafka/clients/producer/MetadataTest.java
@@ -0,0 +1,55 @@
+package kafka.clients.producer;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import kafka.clients.producer.internals.Metadata;
+import kafka.common.Cluster;
+import kafka.common.Node;
+import kafka.common.PartitionInfo;
+
+import org.junit.Test;
+
+public class MetadataTest {
+
+    private long refreshBackoffMs = 100;
+    private long metadataExpireMs = 1000;
+    private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs);
+
+    @Test
+    public void testMetadata() throws Exception {
+        long time = 0;
+        metadata.update(Cluster.empty(), time);
+        assertFalse("No update needed.", metadata.needsUpdate(time));
+        metadata.forceUpdate();
+        assertFalse("Still no updated needed due to backoff", metadata.needsUpdate(time));
+        time += refreshBackoffMs;
+        assertTrue("Update needed now that backoff time expired", metadata.needsUpdate(time));
+        String topic = "my-topic";
+        Thread t1 = asyncFetch(topic);
+        Thread t2 = asyncFetch(topic);
+        assertTrue("Awaiting update", t1.isAlive());
+        assertTrue("Awaiting update", t2.isAlive());
+        metadata.update(clusterWith(topic), time);
+        t1.join();
+        t2.join();
+        assertFalse("No update needed.", metadata.needsUpdate(time));
+        time += metadataExpireMs;
+        assertTrue("Update needed due to stale metadata.", metadata.needsUpdate(time));
+    }
+
+    private Cluster clusterWith(String topic) {
+        return new Cluster(asList(new Node(0, "localhost", 1969)), asList(new PartitionInfo(topic, 0, 0, new int[0], new int[0])));
+    }
+
+    private Thread asyncFetch(final String topic) {
+        Thread thread = new Thread() {
+            public void run() {
+                metadata.fetch(topic, Integer.MAX_VALUE);
+            }
+        };
+        thread.start();
+        return thread;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/kafka/clients/producer/MockProducerTest.java
new file mode 100644
index 0000000..61929a4
--- /dev/null
+++ b/clients/src/test/java/kafka/clients/producer/MockProducerTest.java
@@ -0,0 +1,66 @@
+package kafka.clients.producer;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import kafka.common.Cluster;
+import kafka.common.Node;
+import kafka.common.PartitionInfo;
+import kafka.common.Serializer;
+import kafka.common.StringSerialization;
+
+import org.junit.Test;
+
+public class MockProducerTest {
+
+    @Test
+    public void testAutoCompleteMock() {
+        MockProducer producer = new MockProducer(true);
+        ProducerRecord record = new ProducerRecord("topic", "key", "value");
+        RecordSend send = producer.send(record);
+        assertTrue("Send should be immediately complete", send.completed());
+        assertFalse("Send should be successful", send.hasError());
+        assertEquals("Offset should be 0", 0, send.offset());
+        assertEquals("We should have the record in our history", asList(record), producer.history());
+        producer.clear();
+        assertEquals("Clear should erase our history", 0, producer.history().size());
+    }
+
+    public void testManualCompletion() {
+        MockProducer producer = new MockProducer(false);
+        ProducerRecord record1 = new ProducerRecord("topic", "key1", "value1");
+        ProducerRecord record2 = new ProducerRecord("topic", "key2", "value2");
+        RecordSend send1 = producer.send(record1);
+        assertFalse("Send shouldn't have completed", send1.completed());
+        RecordSend send2 = producer.send(record2);
+        assertFalse("Send shouldn't have completed", send2.completed());
+        assertTrue("Complete the first request", producer.completeNext());
+        assertFalse("Requst should be successful", send1.hasError());
+        assertFalse("Second request still incomplete", send2.completed());
+        IllegalArgumentException e = new IllegalArgumentException("blah");
+        assertTrue("Complete the second request with an error", producer.errorNext(e));
+        try {
+            send2.await();
+            fail("Expected error to be thrown");
+        } catch (IllegalArgumentException err) {
+            // this is good
+        }
+        assertFalse("No more requests to complete", producer.completeNext());
+    }
+
+    public void testSerializationAndPartitioning() {
+        Cluster cluster = new Cluster(asList(new Node(0, "host", -1)), asList(new PartitionInfo("topic",
+                                                                                                0,
+                                                                                                0,
+                                                                                                new int[] { 0 },
+                                                                                                new int[] { 0 })));
+        Serializer serializer = new StringSerialization();
+        Partitioner partitioner = new DefaultPartitioner();
+        MockProducer producer = new MockProducer(serializer, serializer, partitioner, cluster, true);
+        ProducerRecord record = new ProducerRecord("topic", "key", "value");
+        RecordSend send = producer.send(record);
+        assertTrue("Send should be immediately complete", send.completed());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java
new file mode 100644
index 0000000..b1ab361
--- /dev/null
+++ b/clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java
@@ -0,0 +1,135 @@
+package kafka.clients.producer;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import kafka.clients.producer.internals.RecordAccumulator;
+import kafka.clients.producer.internals.RecordBatch;
+import kafka.common.TopicPartition;
+import kafka.common.metrics.Metrics;
+import kafka.common.record.CompressionType;
+import kafka.common.record.LogEntry;
+import kafka.common.record.Record;
+import kafka.common.record.Records;
+import kafka.common.utils.MockTime;
+
+import org.junit.Test;
+
+public class RecordAccumulatorTest {
+
+    private TopicPartition tp = new TopicPartition("test", 0);
+    private MockTime time = new MockTime();
+    private byte[] key = "key".getBytes();
+    private byte[] value = "value".getBytes();
+    private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
+    private Metrics metrics = new Metrics(time);
+
+    @Test
+    public void testFull() throws Exception {
+        long now = time.milliseconds();
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, false, metrics, time);
+        int appends = 1024 / msgSize;
+        for (int i = 0; i < appends; i++) {
+            accum.append(tp, key, value, CompressionType.NONE, null);
+            assertEquals("No partitions should be ready.", 0, accum.ready(now).size());
+        }
+        accum.append(tp, key, value, CompressionType.NONE, null);
+        assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
+        List<RecordBatch> batches = accum.drain(asList(tp), Integer.MAX_VALUE);
+        assertEquals(1, batches.size());
+        RecordBatch batch = batches.get(0);
+        Iterator<LogEntry> iter = batch.records.iterator();
+        for (int i = 0; i < appends; i++) {
+            LogEntry entry = iter.next();
+            assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
+            assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value());
+        }
+        assertFalse("No more records", iter.hasNext());
+    }
+
+    @Test
+    public void testAppendLarge() throws Exception {
+        int batchSize = 512;
+        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, false, metrics, time);
+        accum.append(tp, key, new byte[2 * batchSize], CompressionType.NONE, null);
+        assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
+    }
+
+    @Test
+    public void testLinger() throws Exception {
+        long lingerMs = 10L;
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, false, metrics, time);
+        accum.append(tp, key, value, CompressionType.NONE, null);
+        assertEquals("No partitions should be ready", 0, accum.ready(time.milliseconds()).size());
+        time.sleep(10);
+        assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
+        List<RecordBatch> batches = accum.drain(asList(tp), Integer.MAX_VALUE);
+        assertEquals(1, batches.size());
+        RecordBatch batch = batches.get(0);
+        Iterator<LogEntry> iter = batch.records.iterator();
+        LogEntry entry = iter.next();
+        assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
+        assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value());
+        assertFalse("No more records", iter.hasNext());
+    }
+
+    @Test
+    public void testPartialDrain() throws Exception {
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, false, metrics, time);
+        int appends = 1024 / msgSize + 1;
+        List<TopicPartition> partitions = asList(new TopicPartition("test", 0), new TopicPartition("test", 1));
+        for (TopicPartition tp : partitions) {
+            for (int i = 0; i < appends; i++)
+                accum.append(tp, key, value, CompressionType.NONE, null);
+        }
+        assertEquals("Both partitions should be ready", 2, accum.ready(time.milliseconds()).size());
+
+        List<RecordBatch> batches = accum.drain(partitions, 1024);
+        assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size());
+    }
+
+    @Test
+    public void testStressfulSituation() throws Exception {
+        final int numThreads = 5;
+        final int msgs = 10000;
+        final int numParts = 10;
+        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, true, metrics, time);
+        List<Thread> threads = new ArrayList<Thread>();
+        for (int i = 0; i < numThreads; i++) {
+            threads.add(new Thread() {
+                public void run() {
+                    for (int i = 0; i < msgs; i++) {
+                        try {
+                            accum.append(new TopicPartition("test", i % numParts), key, value, CompressionType.NONE, null);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+            });
+        }
+        for (Thread t : threads)
+            t.start();
+        int read = 0;
+        long now = time.milliseconds();
+        while (read < numThreads * msgs) {
+            List<TopicPartition> tps = accum.ready(now);
+            List<RecordBatch> batches = accum.drain(tps, 5 * 1024);
+            for (RecordBatch batch : batches) {
+                for (LogEntry entry : batch.records)
+                    read++;
+            }
+            accum.deallocate(batches);
+        }
+
+        for (Thread t : threads)
+            t.join();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/clients/producer/RecordSendTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/RecordSendTest.java b/clients/src/test/java/kafka/clients/producer/RecordSendTest.java
new file mode 100644
index 0000000..f8fd14b
--- /dev/null
+++ b/clients/src/test/java/kafka/clients/producer/RecordSendTest.java
@@ -0,0 +1,76 @@
+package kafka.clients.producer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.TimeUnit;
+
+import kafka.clients.producer.internals.ProduceRequestResult;
+import kafka.common.TopicPartition;
+import kafka.common.errors.CorruptMessageException;
+import kafka.common.errors.TimeoutException;
+
+import org.junit.Test;
+
+public class RecordSendTest {
+
+    private TopicPartition topicPartition = new TopicPartition("test", 0);
+    private long baseOffset = 45;
+    private long relOffset = 5;
+
+    /**
+     * Test that waiting on a request that never completes times out
+     */
+    @Test
+    public void testTimeout() {
+        ProduceRequestResult request = new ProduceRequestResult();
+        RecordSend send = new RecordSend(relOffset, request);
+        assertFalse("Request is not completed", send.completed());
+        try {
+            send.await(5, TimeUnit.MILLISECONDS);
+            fail("Should have thrown exception.");
+        } catch (TimeoutException e) { /* this is good */
+        }
+
+        request.done(topicPartition, baseOffset, null);
+        assertTrue(send.completed());
+        assertEquals(baseOffset + relOffset, send.offset());
+    }
+
+    /**
+     * Test that an asynchronous request will eventually throw the right exception
+     */
+    @Test(expected = CorruptMessageException.class)
+    public void testError() {
+        RecordSend send = new RecordSend(relOffset, asyncRequest(baseOffset, new CorruptMessageException(), 50L));
+        send.await();
+    }
+
+    /**
+     * Test that an asynchronous request will eventually return the right offset
+     */
+    @Test
+    public void testBlocking() {
+        RecordSend send = new RecordSend(relOffset, asyncRequest(baseOffset, null, 50L));
+        assertEquals(baseOffset + relOffset, send.offset());
+    }
+
+    /* create a new request result that will be completed after the given timeout */
+    public ProduceRequestResult asyncRequest(final long baseOffset, final RuntimeException error, final long timeout) {
+        final ProduceRequestResult request = new ProduceRequestResult();
+        new Thread() {
+            public void run() {
+                try {
+                    sleep(timeout);
+                    request.done(topicPartition, baseOffset, error);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }.start();
+        return request;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/clients/producer/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/SenderTest.java b/clients/src/test/java/kafka/clients/producer/SenderTest.java
new file mode 100644
index 0000000..73f1aba
--- /dev/null
+++ b/clients/src/test/java/kafka/clients/producer/SenderTest.java
@@ -0,0 +1,92 @@
+package kafka.clients.producer;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+
+import kafka.clients.producer.internals.Metadata;
+import kafka.clients.producer.internals.RecordAccumulator;
+import kafka.clients.producer.internals.Sender;
+import kafka.common.Cluster;
+import kafka.common.Node;
+import kafka.common.PartitionInfo;
+import kafka.common.TopicPartition;
+import kafka.common.metrics.Metrics;
+import kafka.common.network.NetworkReceive;
+import kafka.common.protocol.ApiKeys;
+import kafka.common.protocol.Errors;
+import kafka.common.protocol.ProtoUtils;
+import kafka.common.protocol.types.Struct;
+import kafka.common.record.CompressionType;
+import kafka.common.requests.RequestSend;
+import kafka.common.requests.ResponseHeader;
+import kafka.common.utils.MockTime;
+import kafka.test.MockSelector;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class SenderTest {
+
+    private MockTime time = new MockTime();
+    private MockSelector selector = new MockSelector(time);
+    private int batchSize = 16 * 1024;
+    private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+    private Cluster cluster = new Cluster(asList(new Node(0, "localhost", 1969)), asList(new PartitionInfo("test",
+                                                                                                           0,
+                                                                                                           0,
+                                                                                                           new int[0],
+                                                                                                           new int[0])));
+    private Metrics metrics = new Metrics(time);
+    private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, false, metrics, time);
+    private Sender sender = new Sender(selector, metadata, this.accumulator, "", 1024 * 1024, 0L, (short) -1, 10000, time);
+
+    @Before
+    public void setup() {
+        metadata.update(cluster, time.milliseconds());
+    }
+
+    @Test
+    public void testSimple() throws Exception {
+        TopicPartition tp = new TopicPartition("test", 0);
+        RecordSend send = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
+        sender.run(time.milliseconds());
+        assertEquals("We should have connected", 1, selector.connected().size());
+        selector.clear();
+        sender.run(time.milliseconds());
+        assertEquals("Single request should be sent", 1, selector.completedSends().size());
+        RequestSend request = (RequestSend) selector.completedSends().get(0);
+        selector.clear();
+        long offset = 42;
+        selector.completeReceive(produceResponse(request.header().correlationId(),
+                                                 cluster.leaderFor(tp).id(),
+                                                 tp.topic(),
+                                                 tp.partition(),
+                                                 offset,
+                                                 Errors.NONE.code()));
+        sender.run(time.milliseconds());
+        assertTrue("Request should be completed", send.completed());
+        assertEquals(offset, send.offset());
+    }
+
+    private NetworkReceive produceResponse(int correlation, int source, String topic, int part, long offset, int error) {
+        Struct struct = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id));
+        Struct response = struct.instance("responses");
+        response.set("topic", topic);
+        Struct partResp = response.instance("partition_responses");
+        partResp.set("partition", part);
+        partResp.set("error_code", (short) error);
+        partResp.set("base_offset", offset);
+        response.set("partition_responses", new Object[] { partResp });
+        struct.set("responses", new Object[] { response });
+        ResponseHeader header = new ResponseHeader(correlation);
+        ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + struct.sizeOf());
+        header.writeTo(buffer);
+        struct.writeTo(buffer);
+        buffer.rewind();
+        return new NetworkReceive(source, buffer);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/common/config/ConfigDefTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/kafka/common/config/ConfigDefTest.java
new file mode 100644
index 0000000..a6a91ac
--- /dev/null
+++ b/clients/src/test/java/kafka/common/config/ConfigDefTest.java
@@ -0,0 +1,88 @@
+package kafka.common.config;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import kafka.common.config.ConfigDef.Range;
+import kafka.common.config.ConfigDef.Type;
+
+import org.junit.Test;
+
+public class ConfigDefTest {
+
+    @Test
+    public void testBasicTypes() {
+        ConfigDef def = new ConfigDef().define("a", Type.INT, 5, Range.between(0, 14), "docs")
+                                       .define("b", Type.LONG, "docs")
+                                       .define("c", Type.STRING, "hello", "docs")
+                                       .define("d", Type.LIST, "docs")
+                                       .define("e", Type.DOUBLE, "docs")
+                                       .define("f", Type.CLASS, "docs")
+                                       .define("g", Type.BOOLEAN, "docs");
+
+        Properties props = new Properties();
+        props.put("a", "1   ");
+        props.put("b", 2);
+        props.put("d", " a , b, c");
+        props.put("e", 42.5d);
+        props.put("f", String.class.getName());
+        props.put("g", "true");
+
+        Map<String, Object> vals = def.parse(props);
+        assertEquals(1, vals.get("a"));
+        assertEquals(2L, vals.get("b"));
+        assertEquals("hello", vals.get("c"));
+        assertEquals(asList("a", "b", "c"), vals.get("d"));
+        assertEquals(42.5d, vals.get("e"));
+        assertEquals(String.class, vals.get("f"));
+        assertEquals(true, vals.get("g"));
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testInvalidDefault() {
+        new ConfigDef().define("a", Type.INT, "hello", "docs");
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testNullDefault() {
+        new ConfigDef().define("a", Type.INT, null, null, "docs");
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testMissingRequired() {
+        new ConfigDef().define("a", Type.INT, "docs").parse(new HashMap<String, Object>());
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testDefinedTwice() {
+        new ConfigDef().define("a", Type.STRING, "docs").define("a", Type.INT, "docs");
+    }
+
+    @Test
+    public void testBadInputs() {
+        testBadInputs(Type.INT, "hello", null, "42.5", 42.5, Long.MAX_VALUE, Long.toString(Long.MAX_VALUE), new Object());
+        testBadInputs(Type.LONG, "hello", null, "42.5", Long.toString(Long.MAX_VALUE) + "00", new Object());
+        testBadInputs(Type.DOUBLE, "hello", null, new Object());
+        testBadInputs(Type.STRING, new Object());
+        testBadInputs(Type.LIST, 53, new Object());
+    }
+
+    private void testBadInputs(Type type, Object... values) {
+        for (Object value : values) {
+            Map<String, Object> m = new HashMap<String, Object>();
+            m.put("name", value);
+            ConfigDef def = new ConfigDef().define("name", type, "docs");
+            try {
+                def.parse(m);
+                fail("Expected a config exception on bad input for value " + value);
+            } catch (ConfigException e) {
+                // this is good
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/common/metrics/JmxReporterTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/metrics/JmxReporterTest.java b/clients/src/test/java/kafka/common/metrics/JmxReporterTest.java
new file mode 100644
index 0000000..e286261
--- /dev/null
+++ b/clients/src/test/java/kafka/common/metrics/JmxReporterTest.java
@@ -0,0 +1,21 @@
+package kafka.common.metrics;
+
+import kafka.common.metrics.stats.Avg;
+import kafka.common.metrics.stats.Total;
+
+import org.junit.Test;
+
+public class JmxReporterTest {
+
+    @Test
+    public void testJmxRegistration() throws Exception {
+        Metrics metrics = new Metrics();
+        metrics.addReporter(new JmxReporter());
+        Sensor sensor = metrics.sensor("kafka.requests");
+        sensor.add("pack.bean1.avg", new Avg());
+        sensor.add("pack.bean2.total", new Total());
+        Sensor sensor2 = metrics.sensor("kafka.blah");
+        sensor2.add("pack.bean1.some", new Total());
+        sensor2.add("pack.bean2.some", new Total());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/kafka/common/metrics/MetricsTest.java
new file mode 100644
index 0000000..7d06864
--- /dev/null
+++ b/clients/src/test/java/kafka/common/metrics/MetricsTest.java
@@ -0,0 +1,176 @@
+package kafka.common.metrics;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import kafka.common.metrics.stats.Avg;
+import kafka.common.metrics.stats.Count;
+import kafka.common.metrics.stats.Max;
+import kafka.common.metrics.stats.Min;
+import kafka.common.metrics.stats.Percentile;
+import kafka.common.metrics.stats.Percentiles;
+import kafka.common.metrics.stats.Percentiles.BucketSizing;
+import kafka.common.metrics.stats.Rate;
+import kafka.common.metrics.stats.Total;
+import kafka.common.utils.MockTime;
+
+import org.junit.Test;
+
+public class MetricsTest {
+
+    private static double EPS = 0.000001;
+
+    MockTime time = new MockTime();
+    Metrics metrics = new Metrics(new MetricConfig(), Arrays.asList((MetricsReporter) new JmxReporter()), time);
+
+    @Test
+    public void testSimpleStats() throws Exception {
+        ConstantMeasurable measurable = new ConstantMeasurable();
+        metrics.addMetric("direct.measurable", measurable);
+        Sensor s = metrics.sensor("test.sensor");
+        s.add("test.avg", new Avg());
+        s.add("test.max", new Max());
+        s.add("test.min", new Min());
+        s.add("test.rate", new Rate(TimeUnit.SECONDS));
+        s.add("test.occurences", new Rate(TimeUnit.SECONDS, new Count()));
+        s.add("test.count", new Count());
+        s.add(new Percentiles(100, -100, 100, BucketSizing.CONSTANT, new Percentile("test.median", 50.0), new Percentile("test.perc99_9",
+                                                                                                                         99.9)));
+
+        Sensor s2 = metrics.sensor("test.sensor2");
+        s2.add("s2.total", new Total());
+        s2.record(5.0);
+
+        for (int i = 0; i < 10; i++)
+            s.record(i);
+
+        // pretend 2 seconds passed...
+        time.sleep(2000);
+
+        assertEquals("s2 reflects the constant value", 5.0, metrics.metrics().get("s2.total").value(), EPS);
+        assertEquals("Avg(0...9) = 4.5", 4.5, metrics.metrics().get("test.avg").value(), EPS);
+        assertEquals("Max(0...9) = 9", 9.0, metrics.metrics().get("test.max").value(), EPS);
+        assertEquals("Min(0...9) = 0", 0.0, metrics.metrics().get("test.min").value(), EPS);
+        assertEquals("Rate(0...9) = 22.5", 22.5, metrics.metrics().get("test.rate").value(), EPS);
+        assertEquals("Occurences(0...9) = 5", 5.0, metrics.metrics().get("test.occurences").value(), EPS);
+        assertEquals("Count(0...9) = 10", 10.0, metrics.metrics().get("test.count").value(), EPS);
+    }
+
+    @Test
+    public void testHierarchicalSensors() {
+        Sensor parent1 = metrics.sensor("test.parent1");
+        parent1.add("test.parent1.count", new Count());
+        Sensor parent2 = metrics.sensor("test.parent2");
+        parent2.add("test.parent2.count", new Count());
+        Sensor child1 = metrics.sensor("test.child1", parent1, parent2);
+        child1.add("test.child1.count", new Count());
+        Sensor child2 = metrics.sensor("test.child2", parent1);
+        child2.add("test.child2.count", new Count());
+        Sensor grandchild = metrics.sensor("test.grandchild", child1);
+        grandchild.add("test.grandchild.count", new Count());
+
+        /* increment each sensor one time */
+        parent1.record();
+        parent2.record();
+        child1.record();
+        child2.record();
+        grandchild.record();
+
+        double p1 = parent1.metrics().get(0).value();
+        double p2 = parent2.metrics().get(0).value();
+        double c1 = child1.metrics().get(0).value();
+        double c2 = child2.metrics().get(0).value();
+        double gc = grandchild.metrics().get(0).value();
+
+        /* each metric should have a count equal to one + its children's count */
+        assertEquals(1.0, gc, EPS);
+        assertEquals(1.0 + gc, child1.metrics().get(0).value(), EPS);
+        assertEquals(1.0, c2, EPS);
+        assertEquals(1.0 + c1, p2, EPS);
+        assertEquals(1.0 + c1 + c2, p1, EPS);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testBadSensorHiearchy() {
+        Sensor p = metrics.sensor("parent");
+        Sensor c1 = metrics.sensor("child1", p);
+        Sensor c2 = metrics.sensor("child2", p);
+        metrics.sensor("gc", c1, c2); // should fail
+    }
+
+    @Test
+    public void testEventWindowing() {
+        Count count = new Count();
+        MetricConfig config = new MetricConfig().eventWindow(1).samples(2);
+        count.record(config, 1.0, time.nanoseconds());
+        count.record(config, 1.0, time.nanoseconds());
+        assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS);
+        count.record(config, 1.0, time.nanoseconds()); // first event times out
+        assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS);
+    }
+
+    @Test
+    public void testTimeWindowing() {
+        Count count = new Count();
+        MetricConfig config = new MetricConfig().timeWindow(1, TimeUnit.MILLISECONDS).samples(2);
+        count.record(config, 1.0, time.nanoseconds());
+        time.sleep(1);
+        count.record(config, 1.0, time.nanoseconds());
+        assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS);
+        time.sleep(1);
+        count.record(config, 1.0, time.nanoseconds()); // oldest event times out
+        assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS);
+    }
+
+    @Test
+    public void testOldDataHasNoEffect() {
+        Max max = new Max();
+        long windowMs = 100;
+        MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS);
+        max.record(config, 50, time.nanoseconds());
+        time.sleep(windowMs);
+        assertEquals(Double.NEGATIVE_INFINITY, max.measure(config, time.nanoseconds()), EPS);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testDuplicateMetricName() {
+        metrics.sensor("test").add("test", new Avg());
+        metrics.sensor("test2").add("test", new Total());
+    }
+
+    @Test
+    public void testQuotas() {
+        Sensor sensor = metrics.sensor("test");
+        sensor.add("test1.total", new Total(), new MetricConfig().quota(Quota.lessThan(5.0)));
+        sensor.add("test2.total", new Total(), new MetricConfig().quota(Quota.moreThan(0.0)));
+        sensor.record(5.0);
+        try {
+            sensor.record(1.0);
+            fail("Should have gotten a quota violation.");
+        } catch (QuotaViolationException e) {
+            // this is good
+        }
+        assertEquals(6.0, metrics.metrics().get("test1.total").value(), EPS);
+        sensor.record(-6.0);
+        try {
+            sensor.record(-1.0);
+            fail("Should have gotten a quota violation.");
+        } catch (QuotaViolationException e) {
+            // this is good
+        }
+    }
+
+    public static class ConstantMeasurable implements Measurable {
+        public double value = 0.0;
+
+        @Override
+        public double measure(MetricConfig config, long now) {
+            return value;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java b/clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java
new file mode 100644
index 0000000..03bdd2b
--- /dev/null
+++ b/clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java
@@ -0,0 +1,56 @@
+package kafka.common.metrics.stats;
+
+import static org.junit.Assert.assertEquals;
+import kafka.common.metrics.stats.Histogram.BinScheme;
+import kafka.common.metrics.stats.Histogram.ConstantBinScheme;
+import kafka.common.metrics.stats.Histogram.LinearBinScheme;
+
+import org.junit.Test;
+
+public class HistogramTest {
+
+    private static final double EPS = 0.0000001d;
+
+    // @Test
+    public void testHistogram() {
+        BinScheme scheme = new ConstantBinScheme(12, -5, 5);
+        Histogram hist = new Histogram(scheme);
+        for (int i = -5; i < 5; i++)
+            hist.record(i);
+        for (int i = 0; i < 10; i++)
+            assertEquals(scheme.fromBin(i + 1), hist.value(i / 10.0), EPS);
+    }
+
+    @Test
+    public void testConstantBinScheme() {
+        ConstantBinScheme scheme = new ConstantBinScheme(5, -5, 5);
+        assertEquals("A value below the lower bound should map to the first bin", 0, scheme.toBin(-5.01));
+        assertEquals("A value above the upper bound should map to the last bin", 4, scheme.toBin(5.01));
+        assertEquals("Check boundary of bucket 1", 1, scheme.toBin(-5));
+        assertEquals("Check boundary of bucket 4", 4, scheme.toBin(5));
+        assertEquals("Check boundary of bucket 3", 3, scheme.toBin(4.9999));
+        checkBinningConsistency(new ConstantBinScheme(4, 0, 5));
+        checkBinningConsistency(scheme);
+    }
+
+    public void testLinearBinScheme() {
+        LinearBinScheme scheme = new LinearBinScheme(5, 5);
+        for (int i = 0; i < scheme.bins(); i++)
+            System.out.println(i + " " + scheme.fromBin(i));
+        checkBinningConsistency(scheme);
+    }
+
+    private void checkBinningConsistency(BinScheme scheme) {
+        for (int bin = 0; bin < scheme.bins(); bin++) {
+            double fromBin = scheme.fromBin(bin);
+            int binAgain = scheme.toBin(fromBin);
+            assertEquals("unbinning and rebinning " + bin
+                         + " gave a different result ("
+                         + fromBin
+                         + " was placed in bin "
+                         + binAgain
+                         + " )", bin, binAgain);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java
new file mode 100644
index 0000000..5204f3a
--- /dev/null
+++ b/clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java
@@ -0,0 +1,96 @@
+package kafka.common.protocol.types;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class ProtocolSerializationTest {
+
+    private Schema schema;
+    private Struct struct;
+
+    @Before
+    public void setup() {
+        this.schema = new Schema(new Field("int8", Type.INT8),
+                                 new Field("int16", Type.INT16),
+                                 new Field("int32", Type.INT32),
+                                 new Field("int64", Type.INT64),
+                                 new Field("string", Type.STRING),
+                                 new Field("bytes", Type.BYTES),
+                                 new Field("array", new ArrayOf(Type.INT32)),
+                                 new Field("struct", new Schema(new Field("field", Type.INT32))));
+        this.struct = new Struct(this.schema).set("int8", (byte) 1)
+                                             .set("int16", (short) 1)
+                                             .set("int32", (int) 1)
+                                             .set("int64", (long) 1)
+                                             .set("string", "1")
+                                             .set("bytes", "1".getBytes())
+                                             .set("array", new Object[] { 1 });
+        this.struct.set("struct", this.struct.instance("struct").set("field", new Object[] { 1, 2, 3 }));
+    }
+
+    @Test
+    public void testSimple() {
+        check(Type.INT8, (byte) -111);
+        check(Type.INT16, (short) -11111);
+        check(Type.INT32, -11111111);
+        check(Type.INT64, -11111111111L);
+        check(Type.STRING, "");
+        check(Type.STRING, "hello");
+        check(Type.STRING, "A\u00ea\u00f1\u00fcC");
+        check(Type.BYTES, ByteBuffer.allocate(0));
+        check(Type.BYTES, ByteBuffer.wrap("abcd".getBytes()));
+        check(new ArrayOf(Type.INT32), new Object[] { 1, 2, 3, 4 });
+        check(new ArrayOf(Type.STRING), new Object[] {});
+        check(new ArrayOf(Type.STRING), new Object[] { "hello", "there", "beautiful" });
+    }
+
+    @Test
+    public void testNulls() {
+        for (Field f : this.schema.fields()) {
+            Object o = this.struct.get(f);
+            try {
+                this.struct.set(f, null);
+                this.struct.validate();
+                fail("Should not allow serialization of null value.");
+            } catch (SchemaException e) {
+                // this is good
+                this.struct.set(f, o);
+            }
+        }
+    }
+
+    @Test
+    public void testDefault() {
+        Schema schema = new Schema(new Field("field", Type.INT32, "doc", 42));
+        Struct struct = new Struct(schema);
+        assertEquals("Should get the default value", 42, struct.get("field"));
+        struct.validate(); // should be valid even with missing value
+    }
+
+    private Object roundtrip(Type type, Object obj) {
+        ByteBuffer buffer = ByteBuffer.allocate(type.sizeOf(obj));
+        type.write(buffer, obj);
+        assertFalse("The buffer should now be full.", buffer.hasRemaining());
+        buffer.rewind();
+        Object read = type.read(buffer);
+        assertFalse("All bytes should have been read.", buffer.hasRemaining());
+        return read;
+    }
+
+    private void check(Type type, Object obj) {
+        Object result = roundtrip(type, obj);
+        if (obj instanceof Object[]) {
+            obj = Arrays.asList((Object[]) obj);
+            result = Arrays.asList((Object[]) result);
+        }
+        assertEquals("The object read back should be the same as what was written.", obj, result);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/kafka/common/record/MemoryRecordsTest.java
new file mode 100644
index 0000000..6906309
--- /dev/null
+++ b/clients/src/test/java/kafka/common/record/MemoryRecordsTest.java
@@ -0,0 +1,44 @@
+package kafka.common.record;
+
+import static kafka.common.utils.Utils.toArray;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Test;
+
+public class MemoryRecordsTest {
+
+    @Test
+    public void testIterator() {
+        MemoryRecords recs1 = new MemoryRecords(ByteBuffer.allocate(1024));
+        MemoryRecords recs2 = new MemoryRecords(ByteBuffer.allocate(1024));
+        List<Record> list = Arrays.asList(new Record("a".getBytes(), "1".getBytes()),
+                                          new Record("b".getBytes(), "2".getBytes()),
+                                          new Record("c".getBytes(), "3".getBytes()));
+        for (int i = 0; i < list.size(); i++) {
+            Record r = list.get(i);
+            recs1.append(i, r);
+            recs2.append(i, toArray(r.key()), toArray(r.value()), r.compressionType());
+        }
+
+        for (int iteration = 0; iteration < 2; iteration++) {
+            for (MemoryRecords recs : Arrays.asList(recs1, recs2)) {
+                Iterator<LogEntry> iter = recs.iterator();
+                for (int i = 0; i < list.size(); i++) {
+                    assertTrue(iter.hasNext());
+                    LogEntry entry = iter.next();
+                    assertEquals((long) i, entry.offset());
+                    assertEquals(list.get(i), entry.record());
+                }
+                assertFalse(iter.hasNext());
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/common/record/RecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/record/RecordTest.java b/clients/src/test/java/kafka/common/record/RecordTest.java
new file mode 100644
index 0000000..9c59c9b
--- /dev/null
+++ b/clients/src/test/java/kafka/common/record/RecordTest.java
@@ -0,0 +1,87 @@
+package kafka.common.record;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(value = Parameterized.class)
+public class RecordTest {
+
+    private ByteBuffer key;
+    private ByteBuffer value;
+    private CompressionType compression;
+    private Record record;
+
+    public RecordTest(byte[] key, byte[] value, CompressionType compression) {
+        this.key = key == null ? null : ByteBuffer.wrap(key);
+        this.value = value == null ? null : ByteBuffer.wrap(value);
+        this.compression = compression;
+        this.record = new Record(key, value, compression);
+    }
+
+    @Test
+    public void testFields() {
+        assertEquals(compression, record.compressionType());
+        assertEquals(key != null, record.hasKey());
+        assertEquals(key, record.key());
+        if (key != null)
+            assertEquals(key.limit(), record.keySize());
+        assertEquals(Record.CURRENT_MAGIC_VALUE, record.magic());
+        assertEquals(value, record.value());
+        if (value != null)
+            assertEquals(value.limit(), record.valueSize());
+    }
+
+    @Test
+    public void testChecksum() {
+        assertEquals(record.checksum(), record.computeChecksum());
+        assertTrue(record.isValid());
+        for (int i = Record.CRC_OFFSET + Record.CRC_LENGTH; i < record.size(); i++) {
+            Record copy = copyOf(record);
+            copy.buffer().put(i, (byte) 69);
+            assertFalse(copy.isValid());
+            try {
+                copy.ensureValid();
+                fail("Should fail the above test.");
+            } catch (InvalidRecordException e) {
+                // this is good
+            }
+        }
+    }
+
+    private Record copyOf(Record record) {
+        ByteBuffer buffer = ByteBuffer.allocate(record.size());
+        record.buffer().put(buffer);
+        buffer.rewind();
+        record.buffer().rewind();
+        return new Record(buffer);
+    }
+
+    @Test
+    public void testEquality() {
+        assertEquals(record, copyOf(record));
+    }
+
+    @Parameters
+    public static Collection<Object[]> data() {
+        List<Object[]> values = new ArrayList<Object[]>();
+        for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes()))
+            for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes()))
+                for (CompressionType compression : CompressionType.values())
+                    values.add(new Object[] { key, value, compression });
+        return values;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java b/clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java
new file mode 100644
index 0000000..7662d38
--- /dev/null
+++ b/clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java
@@ -0,0 +1,54 @@
+package kafka.common.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.junit.Test;
+
+public class AbstractIteratorTest {
+
+    @Test
+    public void testIterator() {
+        int max = 10;
+        List<Integer> l = new ArrayList<Integer>();
+        for (int i = 0; i < max; i++)
+            l.add(i);
+        ListIterator<Integer> iter = new ListIterator<Integer>(l);
+        for (int i = 0; i < max; i++) {
+            Integer value = i;
+            assertEquals(value, iter.peek());
+            assertTrue(iter.hasNext());
+            assertEquals(value, iter.next());
+        }
+        assertFalse(iter.hasNext());
+    }
+
+    @Test(expected = NoSuchElementException.class)
+    public void testEmptyIterator() {
+        Iterator<Object> iter = new ListIterator<Object>(Arrays.asList());
+        iter.next();
+    }
+
+    class ListIterator<T> extends AbstractIterator<T> {
+        private List<T> list;
+        private int position = 0;
+
+        public ListIterator(List<T> l) {
+            this.list = l;
+        }
+
+        public T makeNext() {
+            if (position < list.size())
+                return list.get(position++);
+            else
+                return allDone();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/common/utils/MockTime.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/utils/MockTime.java b/clients/src/test/java/kafka/common/utils/MockTime.java
new file mode 100644
index 0000000..095d4f6
--- /dev/null
+++ b/clients/src/test/java/kafka/common/utils/MockTime.java
@@ -0,0 +1,28 @@
+package kafka.common.utils;
+
+import java.util.concurrent.TimeUnit;
+
+public class MockTime implements Time {
+
+    private long nanos = 0;
+
+    public MockTime() {
+        this.nanos = System.nanoTime();
+    }
+
+    @Override
+    public long milliseconds() {
+        return TimeUnit.MILLISECONDS.convert(this.nanos, TimeUnit.NANOSECONDS);
+    }
+
+    @Override
+    public long nanoseconds() {
+        return nanos;
+    }
+
+    @Override
+    public void sleep(long ms) {
+        this.nanos += TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/test/MetricsBench.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/test/MetricsBench.java b/clients/src/test/java/kafka/test/MetricsBench.java
new file mode 100644
index 0000000..2b164bd
--- /dev/null
+++ b/clients/src/test/java/kafka/test/MetricsBench.java
@@ -0,0 +1,38 @@
+package kafka.test;
+
+import java.util.Arrays;
+
+import kafka.common.metrics.Metrics;
+import kafka.common.metrics.Sensor;
+import kafka.common.metrics.stats.Avg;
+import kafka.common.metrics.stats.Count;
+import kafka.common.metrics.stats.Max;
+import kafka.common.metrics.stats.Percentile;
+import kafka.common.metrics.stats.Percentiles;
+import kafka.common.metrics.stats.Percentiles.BucketSizing;
+
+public class MetricsBench {
+
+    public static void main(String[] args) {
+        long iters = Long.parseLong(args[0]);
+        Metrics metrics = new Metrics();
+        Sensor parent = metrics.sensor("parent");
+        Sensor child = metrics.sensor("child", parent);
+        for (Sensor sensor : Arrays.asList(parent, child)) {
+            sensor.add(sensor.name() + ".avg", new Avg());
+            sensor.add(sensor.name() + ".count", new Count());
+            sensor.add(sensor.name() + ".max", new Max());
+            sensor.add(new Percentiles(1024,
+                                       0.0,
+                                       iters,
+                                       BucketSizing.CONSTANT,
+                                       new Percentile(sensor.name() + ".median", 50.0),
+                                       new Percentile(sensor.name() + ".p_99", 99.0)));
+        }
+        long start = System.nanoTime();
+        for (int i = 0; i < iters; i++)
+            child.record(i);
+        double ellapsed = (System.nanoTime() - start) / (double) iters;
+        System.out.println(String.format("%.2f ns per metric recording.", ellapsed));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/test/Microbenchmarks.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/test/Microbenchmarks.java b/clients/src/test/java/kafka/test/Microbenchmarks.java
new file mode 100644
index 0000000..a0ddecb
--- /dev/null
+++ b/clients/src/test/java/kafka/test/Microbenchmarks.java
@@ -0,0 +1,143 @@
+package kafka.test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import kafka.common.utils.CopyOnWriteMap;
+import kafka.common.utils.SystemTime;
+
+public class Microbenchmarks {
+
+    public static void main(String[] args) throws Exception {
+
+        final int iters = Integer.parseInt(args[0]);
+        double x = 0.0;
+        long start = System.nanoTime();
+        for (int i = 0; i < iters; i++)
+            x += Math.sqrt(x);
+        System.out.println(x);
+        System.out.println("sqrt: " + (System.nanoTime() - start) / (double) iters);
+
+        // test clocks
+        systemMillis(iters);
+        systemNanos(iters);
+        long total = 0;
+        start = System.nanoTime();
+        total += systemMillis(iters);
+        System.out.println("System.currentTimeMillis(): " + (System.nanoTime() - start) / iters);
+        start = System.nanoTime();
+        total += systemNanos(iters);
+        System.out.println("System.nanoTime(): " + (System.nanoTime() - start) / iters);
+        System.out.println(total);
+
+        // test random
+        int n = 0;
+        Random random = new Random();
+        start = System.nanoTime();
+        for (int i = 0; i < iters; i++) {
+            n += random.nextInt();
+        }
+        System.out.println(n);
+        System.out.println("random: " + (System.nanoTime() - start) / iters);
+
+        float[] floats = new float[1024];
+        for (int i = 0; i < floats.length; i++)
+            floats[i] = random.nextFloat();
+        Arrays.sort(floats);
+
+        int loc = 0;
+        start = System.nanoTime();
+        for (int i = 0; i < iters; i++)
+            loc += Arrays.binarySearch(floats, floats[i % floats.length]);
+        System.out.println(loc);
+        System.out.println("binary search: " + (System.nanoTime() - start) / iters);
+
+        final SystemTime time = new SystemTime();
+        final AtomicBoolean done = new AtomicBoolean(false);
+        final Object lock = new Object();
+        Thread t1 = new Thread() {
+            public void run() {
+                time.sleep(1);
+                int counter = 0;
+                long start = time.nanoseconds();
+                for (int i = 0; i < iters; i++) {
+                    synchronized (lock) {
+                        counter++;
+                    }
+                }
+                System.out.println("synchronized: " + ((System.nanoTime() - start) / iters));
+                System.out.println(counter);
+                done.set(true);
+            }
+        };
+
+        Thread t2 = new Thread() {
+            public void run() {
+                int counter = 0;
+                while (!done.get()) {
+                    time.sleep(1);
+                    synchronized (lock) {
+                        counter += 1;
+                    }
+                }
+                System.out.println("Counter: " + counter);
+            }
+        };
+
+        t1.start();
+        t2.start();
+        t1.join();
+        t2.join();
+
+        Map<String, Integer> values = new HashMap<String, Integer>();
+        for (int i = 0; i < 100; i++)
+            values.put(Integer.toString(i), i);
+        System.out.println("HashMap:");
+        benchMap(2, 1000000, values);
+        System.out.println("ConcurentHashMap:");
+        benchMap(2, 1000000, new ConcurrentHashMap<String, Integer>(values));
+        System.out.println("CopyOnWriteMap:");
+        benchMap(2, 1000000, new CopyOnWriteMap<String, Integer>(values));
+    }
+
+    private static void benchMap(int numThreads, final int iters, final Map<String, Integer> map) throws Exception {
+        final List<String> keys = new ArrayList<String>(map.keySet());
+        final List<Thread> threads = new ArrayList<Thread>();
+        for (int i = 0; i < numThreads; i++) {
+            threads.add(new Thread() {
+                public void run() {
+                    int sum = 0;
+                    long start = System.nanoTime();
+                    for (int j = 0; j < iters; j++)
+                        map.get(keys.get(j % threads.size()));
+                    System.out.println("Map access time: " + ((System.nanoTime() - start) / (double) iters));
+                }
+            });
+        }
+        for (Thread thread : threads)
+            thread.start();
+        for (Thread thread : threads)
+            thread.join();
+    }
+
+    private static long systemMillis(int iters) {
+        long total = 0;
+        for (int i = 0; i < iters; i++)
+            total += System.currentTimeMillis();
+        return total;
+    }
+
+    private static long systemNanos(int iters) {
+        long total = 0;
+        for (int i = 0; i < iters; i++)
+            total += System.currentTimeMillis();
+        return total;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/test/MockSelector.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/test/MockSelector.java b/clients/src/test/java/kafka/test/MockSelector.java
new file mode 100644
index 0000000..15508f4
--- /dev/null
+++ b/clients/src/test/java/kafka/test/MockSelector.java
@@ -0,0 +1,87 @@
+package kafka.test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import kafka.common.network.NetworkReceive;
+import kafka.common.network.NetworkSend;
+import kafka.common.network.Selectable;
+import kafka.common.utils.Time;
+
+/**
+ * A fake selector to use for testing
+ */
+public class MockSelector implements Selectable {
+
+    private final Time time;
+    private final List<NetworkSend> completedSends = new ArrayList<NetworkSend>();
+    private final List<NetworkReceive> completedReceives = new ArrayList<NetworkReceive>();
+    private final List<Integer> disconnected = new ArrayList<Integer>();
+    private final List<Integer> connected = new ArrayList<Integer>();
+
+    public MockSelector(Time time) {
+        this.time = time;
+    }
+
+    @Override
+    public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
+        this.connected.add(id);
+    }
+
+    @Override
+    public void disconnect(int id) {
+        this.disconnected.add(id);
+    }
+
+    @Override
+    public void wakeup() {
+    }
+
+    @Override
+    public void close() {
+    }
+
+    public void clear() {
+        this.completedSends.clear();
+        this.completedReceives.clear();
+        this.disconnected.clear();
+        this.connected.clear();
+    }
+
+    @Override
+    public void poll(long timeout, List<NetworkSend> sends) throws IOException {
+        this.completedSends.addAll(sends);
+        time.sleep(timeout);
+    }
+
+    @Override
+    public List<NetworkSend> completedSends() {
+        return completedSends;
+    }
+
+    public void completeSend(NetworkSend send) {
+        this.completedSends.add(send);
+    }
+
+    @Override
+    public List<NetworkReceive> completedReceives() {
+        return completedReceives;
+    }
+
+    public void completeReceive(NetworkReceive receive) {
+        this.completedReceives.add(receive);
+    }
+
+    @Override
+    public List<Integer> disconnected() {
+        return disconnected;
+    }
+
+    @Override
+    public List<Integer> connected() {
+        return connected;
+    }
+
+}