You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/01/28 20:16:28 UTC
[2/7] KAFKA-1227 New producer!
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/utils/KafkaThread.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/utils/KafkaThread.java b/clients/src/main/java/kafka/common/utils/KafkaThread.java
new file mode 100644
index 0000000..f830aba
--- /dev/null
+++ b/clients/src/main/java/kafka/common/utils/KafkaThread.java
@@ -0,0 +1,18 @@
+package kafka.common.utils;
+
+/**
+ * A wrapper for Thread that sets things up nicely
+ */
+public class KafkaThread extends Thread {
+
+ public KafkaThread(String name, Runnable runnable, boolean daemon) {
+ super(runnable, name);
+ setDaemon(daemon);
+ setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+ public void uncaughtException(Thread t, Throwable e) {
+ e.printStackTrace();
+ }
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/utils/SystemTime.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/utils/SystemTime.java b/clients/src/main/java/kafka/common/utils/SystemTime.java
new file mode 100644
index 0000000..c8ca09c
--- /dev/null
+++ b/clients/src/main/java/kafka/common/utils/SystemTime.java
@@ -0,0 +1,26 @@
+package kafka.common.utils;
+
+/**
+ * A time implementation that uses the system clock and sleep call
+ */
+public class SystemTime implements Time {
+
+ @Override
+ public long milliseconds() {
+ return System.currentTimeMillis();
+ }
+
+ public long nanoseconds() {
+ return System.nanoTime();
+ }
+
+ @Override
+ public void sleep(long ms) {
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ // no stress
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/utils/Time.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/utils/Time.java b/clients/src/main/java/kafka/common/utils/Time.java
new file mode 100644
index 0000000..390d16f
--- /dev/null
+++ b/clients/src/main/java/kafka/common/utils/Time.java
@@ -0,0 +1,23 @@
+package kafka.common.utils;
+
+/**
+ * An interface abstracting the clock to use in unit testing classes that make use of clock time
+ */
+public interface Time {
+
+ /**
+ * The current time in milliseconds
+ */
+ public long milliseconds();
+
+ /**
+ * The current time in nanoseconds
+ */
+ public long nanoseconds();
+
+ /**
+ * Sleep for the given number of milliseconds
+ */
+ public void sleep(long ms);
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/utils/Utils.java b/clients/src/main/java/kafka/common/utils/Utils.java
new file mode 100644
index 0000000..f132771
--- /dev/null
+++ b/clients/src/main/java/kafka/common/utils/Utils.java
@@ -0,0 +1,230 @@
+package kafka.common.utils;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+
+import kafka.common.KafkaException;
+
+public class Utils {
+
+ /**
+ * Turn the given UTF8 byte array into a string
+ *
+ * @param bytes The byte array
+ * @return The string
+ */
+ public static String utf8(byte[] bytes) {
+ try {
+ return new String(bytes, "UTF8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("This shouldn't happen.", e);
+ }
+ }
+
+ /**
+ * Turn a string into a utf8 byte[]
+ *
+ * @param string The string
+ * @return The byte[]
+ */
+ public static byte[] utf8(String string) {
+ try {
+ return string.getBytes("UTF8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("This shouldn't happen.", e);
+ }
+ }
+
+ /**
+ * Read an unsigned integer from the current position in the buffer, incrementing the position by 4 bytes
+ *
+ * @param buffer The buffer to read from
+ * @return The integer read, as a long to avoid signedness
+ */
+ public static long readUnsignedInt(ByteBuffer buffer) {
+ return buffer.getInt() & 0xffffffffL;
+ }
+
+ /**
+ * Read an unsigned integer from the given position without modifying the buffers position
+ *
+ * @param buffer the buffer to read from
+ * @param index the index from which to read the integer
+ * @return The integer read, as a long to avoid signedness
+ */
+ public static long readUnsignedInt(ByteBuffer buffer, int index) {
+ return buffer.getInt(index) & 0xffffffffL;
+ }
+
+ /**
+ * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
+ *
+ * @param buffer The buffer to write to
+ * @param value The value to write
+ */
+ public static void writetUnsignedInt(ByteBuffer buffer, long value) {
+ buffer.putInt((int) (value & 0xffffffffL));
+ }
+
+ /**
+ * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
+ *
+ * @param buffer The buffer to write to
+ * @param index The position in the buffer at which to begin writing
+ * @param value The value to write
+ */
+ public static void writeUnsignedInt(ByteBuffer buffer, int index, long value) {
+ buffer.putInt(index, (int) (value & 0xffffffffL));
+ }
+
+ /**
+ * Compute the CRC32 of the byte array
+ *
+ * @param bytes The array to compute the checksum for
+ * @return The CRC32
+ */
+ public static long crc32(byte[] bytes) {
+ return crc32(bytes, 0, bytes.length);
+ }
+
+ /**
+ * Compute the CRC32 of the segment of the byte array given by the specificed size and offset
+ *
+ * @param bytes The bytes to checksum
+ * @param offset the offset at which to begin checksumming
+ * @param size the number of bytes to checksum
+ * @return The CRC32
+ */
+ public static long crc32(byte[] bytes, int offset, int size) {
+ Crc32 crc = new Crc32();
+ crc.update(bytes, offset, size);
+ return crc.getValue();
+ }
+
+ /**
+ * Get the absolute value of the given number. If the number is Int.MinValue return 0. This is different from
+ * java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!).
+ */
+ public static int abs(int n) {
+ return n & 0x7fffffff;
+ }
+
+ /**
+ * Get the length for UTF8-encoding a string without encoding it first
+ *
+ * @param s The string to calculate the length for
+ * @return The length when serialized
+ */
+ public static int utf8Length(CharSequence s) {
+ int count = 0;
+ for (int i = 0, len = s.length(); i < len; i++) {
+ char ch = s.charAt(i);
+ if (ch <= 0x7F) {
+ count++;
+ } else if (ch <= 0x7FF) {
+ count += 2;
+ } else if (Character.isHighSurrogate(ch)) {
+ count += 4;
+ ++i;
+ } else {
+ count += 3;
+ }
+ }
+ return count;
+ }
+
+ /**
+ * Read the given byte buffer into a byte array
+ */
+ public static byte[] toArray(ByteBuffer buffer) {
+ return toArray(buffer, 0, buffer.limit());
+ }
+
+ /**
+ * Read a byte array from the given offset and size in the buffer
+ */
+ public static byte[] toArray(ByteBuffer buffer, int offset, int size) {
+ byte[] dest = new byte[size];
+ if (buffer.hasArray()) {
+ System.arraycopy(buffer.array(), buffer.arrayOffset() + offset, dest, 0, size);
+ } else {
+ int pos = buffer.position();
+ buffer.get(dest);
+ buffer.position(pos);
+ }
+ return dest;
+ }
+
+ /**
+ * Check that the parameter t is not null
+ *
+ * @param t The object to check
+ * @return t if it isn't null
+ * @throws NullPointerException if t is null.
+ */
+ public static <T> T notNull(T t) {
+ if (t == null)
+ throw new NullPointerException();
+ else
+ return t;
+ }
+
+ /**
+ * Instantiate the class
+ */
+ public static Object newInstance(Class<?> c) {
+ try {
+ return c.newInstance();
+ } catch (IllegalAccessException e) {
+ throw new KafkaException("Could not instantiate class " + c.getName(), e);
+ } catch (InstantiationException e) {
+ throw new KafkaException("Could not instantiate class " + c.getName() + " Does it have a public no-argument constructor?", e);
+ }
+ }
+
+ /**
+ * Generates 32 bit murmur2 hash from byte array
+ * @param data byte array to hash
+ * @return 32 bit hash of the given array
+ */
+ public static int murmur2(final byte[] data) {
+ int length = data.length;
+ int seed = 0x9747b28c;
+ // 'm' and 'r' are mixing constants generated offline.
+ // They're not really 'magic', they just happen to work well.
+ final int m = 0x5bd1e995;
+ final int r = 24;
+
+ // Initialize the hash to a random value
+ int h = seed ^ length;
+ int length4 = length / 4;
+
+ for (int i = 0; i < length4; i++) {
+ final int i4 = i * 4;
+ int k = (data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 + 2] & 0xff) << 16) + ((data[i4 + 3] & 0xff) << 24);
+ k *= m;
+ k ^= k >>> r;
+ k *= m;
+ h *= m;
+ h ^= k;
+ }
+
+ // Handle the last few bytes of the input array
+ switch (length % 4) {
+ case 3:
+ h ^= (data[(length & ~3) + 2] & 0xff) << 16;
+ case 2:
+ h ^= (data[(length & ~3) + 1] & 0xff) << 8;
+ case 1:
+ h ^= (data[length & ~3] & 0xff);
+ h *= m;
+ }
+
+ h ^= h >>> 13;
+ h *= m;
+ h ^= h >>> 15;
+
+ return h;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/clients/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/common/network/SelectorTest.java b/clients/src/test/java/kafka/clients/common/network/SelectorTest.java
new file mode 100644
index 0000000..68bc9ee
--- /dev/null
+++ b/clients/src/test/java/kafka/clients/common/network/SelectorTest.java
@@ -0,0 +1,292 @@
+package kafka.clients.common.network;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.UnresolvedAddressException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import kafka.common.network.NetworkReceive;
+import kafka.common.network.NetworkSend;
+import kafka.common.network.Selectable;
+import kafka.common.network.Selector;
+import kafka.common.utils.Utils;
+import kafka.test.TestUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * A set of tests for the selector. These use a test harness that runs a simple socket server that echos back responses.
+ */
+public class SelectorTest {
+
+ private static final List<NetworkSend> EMPTY = new ArrayList<NetworkSend>();
+ private static final int BUFFER_SIZE = 4 * 1024;
+
+ private EchoServer server;
+ private Selectable selector;
+
+ @Before
+ public void setup() throws Exception {
+ this.server = new EchoServer();
+ this.server.start();
+ this.selector = new Selector();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ this.selector.close();
+ this.server.close();
+ }
+
+ /**
+ * Validate that when the server disconnects, a client send ends up with that node in the disconnected list.
+ */
+ @Test
+ public void testServerDisconnect() throws Exception {
+ int node = 0;
+
+ // connect and do a simple request
+ blockingConnect(node);
+ assertEquals("hello", blockingRequest(node, "hello"));
+
+ // disconnect
+ this.server.closeConnections();
+ while (!selector.disconnected().contains(node))
+ selector.poll(1000L, EMPTY);
+
+ // reconnect and do another request
+ blockingConnect(node);
+ assertEquals("hello", blockingRequest(node, "hello"));
+ }
+
+ /**
+ * Validate that the client can intentionally disconnect and reconnect
+ */
+ @Test
+ public void testClientDisconnect() throws Exception {
+ int node = 0;
+ blockingConnect(node);
+ selector.disconnect(node);
+ selector.poll(10, asList(createSend(node, "hello1")));
+ assertEquals("Request should not have succeeded", 0, selector.completedSends().size());
+ assertEquals("There should be a disconnect", 1, selector.disconnected().size());
+ assertTrue("The disconnect should be from our node", selector.disconnected().contains(node));
+ blockingConnect(node);
+ assertEquals("hello2", blockingRequest(node, "hello2"));
+ }
+
+ /**
+ * Sending a request with one already in flight should result in an exception
+ */
+ @Test(expected = IllegalStateException.class)
+ public void testCantSendWithInProgress() throws Exception {
+ int node = 0;
+ blockingConnect(node);
+ selector.poll(1000L, asList(createSend(node, "test1"), createSend(node, "test2")));
+ }
+
+ /**
+ * Sending a request to a node without an existing connection should result in an exception
+ */
+ @Test(expected = IllegalStateException.class)
+ public void testCantSendWithoutConnecting() throws Exception {
+ selector.poll(1000L, asList(createSend(0, "test")));
+ }
+
+ /**
+ * Sending a request to a node with a bad hostname should result in an exception during connect
+ */
+ @Test(expected = UnresolvedAddressException.class)
+ public void testNoRouteToHost() throws Exception {
+ selector.connect(0, new InetSocketAddress("asdf.asdf.dsc", server.port), BUFFER_SIZE, BUFFER_SIZE);
+ }
+
+ /**
+ * Sending a request to a node not listening on that port should result in disconnection
+ */
+ @Test
+ public void testConnectionRefused() throws Exception {
+ int node = 0;
+ selector.connect(node, new InetSocketAddress("localhost", TestUtils.choosePort()), BUFFER_SIZE, BUFFER_SIZE);
+ while (selector.disconnected().contains(node))
+ selector.poll(1000L, EMPTY);
+ }
+
+ /**
+ * Send multiple requests to several connections in parallel. Validate that responses are received in the order that
+ * requests were sent.
+ */
+ @Test
+ public void testNormalOperation() throws Exception {
+ int conns = 5;
+ int reqs = 500;
+
+ // create connections
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+ for (int i = 0; i < conns; i++)
+ selector.connect(i, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+ // send echo requests and receive responses
+ int[] requests = new int[conns];
+ int[] responses = new int[conns];
+ int responseCount = 0;
+ List<NetworkSend> sends = new ArrayList<NetworkSend>();
+ for (int i = 0; i < conns; i++)
+ sends.add(createSend(i, i + "-" + 0));
+
+ // loop until we complete all requests
+ while (responseCount < conns * reqs) {
+ // do the i/o
+ selector.poll(0L, sends);
+
+ assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size());
+
+ // handle any responses we may have gotten
+ for (NetworkReceive receive : selector.completedReceives()) {
+ String[] pieces = asString(receive).split("-");
+ assertEquals("Should be in the form 'conn-counter'", 2, pieces.length);
+ assertEquals("Check the source", receive.source(), Integer.parseInt(pieces[0]));
+ assertEquals("Check that the receive has kindly been rewound", 0, receive.payload().position());
+ assertEquals("Check the request counter", responses[receive.source()], Integer.parseInt(pieces[1]));
+ responses[receive.source()]++; // increment the expected counter
+ responseCount++;
+ }
+
+ // prepare new sends for the next round
+ sends.clear();
+ for (NetworkSend send : selector.completedSends()) {
+ int dest = send.destination();
+ requests[dest]++;
+ if (requests[dest] < reqs)
+ sends.add(createSend(dest, dest + "-" + requests[dest]));
+ }
+ }
+ }
+
+ /**
+ * Validate that we can send and receive a message larger than the receive and send buffer size
+ */
+ @Test
+ public void testSendLargeRequest() throws Exception {
+ int node = 0;
+ blockingConnect(node);
+ String big = TestUtils.randomString(10 * BUFFER_SIZE);
+ assertEquals(big, blockingRequest(node, big));
+ }
+
+ /**
+ * Test sending an empty string
+ */
+ @Test
+ public void testEmptyRequest() throws Exception {
+ int node = 0;
+ blockingConnect(node);
+ assertEquals("", blockingRequest(node, ""));
+ }
+
+ private String blockingRequest(int node, String s) throws IOException {
+ selector.poll(1000L, asList(createSend(node, s)));
+ while (true) {
+ selector.poll(1000L, EMPTY);
+ for (NetworkReceive receive : selector.completedReceives())
+ if (receive.source() == node)
+ return asString(receive);
+ }
+ }
+
+ /* connect and wait for the connection to complete */
+ private void blockingConnect(int node) throws IOException {
+ selector.connect(node, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
+ while (!selector.connected().contains(node))
+ selector.poll(10000L, EMPTY);
+ }
+
+ private NetworkSend createSend(int node, String s) {
+ return new NetworkSend(node, ByteBuffer.wrap(s.getBytes()));
+ }
+
+ private String asString(NetworkReceive receive) {
+ return new String(Utils.toArray(receive.payload()));
+ }
+
+ /**
+ * A simple server that takes size delimited byte arrays and just echos them back to the sender.
+ */
+ static class EchoServer extends Thread {
+ public final int port;
+ private final ServerSocket serverSocket;
+ private final List<Thread> threads;
+ private final List<Socket> sockets;
+
+ public EchoServer() throws Exception {
+ this.port = TestUtils.choosePort();
+ this.serverSocket = new ServerSocket(port);
+ this.threads = Collections.synchronizedList(new ArrayList<Thread>());
+ this.sockets = Collections.synchronizedList(new ArrayList<Socket>());
+ }
+
+ public void run() {
+ try {
+ while (true) {
+ final Socket socket = serverSocket.accept();
+ sockets.add(socket);
+ Thread thread = new Thread() {
+ public void run() {
+ try {
+ DataInputStream input = new DataInputStream(socket.getInputStream());
+ DataOutputStream output = new DataOutputStream(socket.getOutputStream());
+ while (socket.isConnected() && !socket.isClosed()) {
+ int size = input.readInt();
+ byte[] bytes = new byte[size];
+ input.readFully(bytes);
+ output.writeInt(size);
+ output.write(bytes);
+ output.flush();
+ }
+ } catch (IOException e) {
+ // ignore
+ } finally {
+ try {
+ socket.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
+ };
+ thread.start();
+ threads.add(thread);
+ }
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+
+ public void closeConnections() throws IOException {
+ for (Socket socket : sockets)
+ socket.close();
+ }
+
+ public void close() throws IOException, InterruptedException {
+ this.serverSocket.close();
+ closeConnections();
+ for (Thread t : threads)
+ t.join();
+ join();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/clients/producer/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/BufferPoolTest.java b/clients/src/test/java/kafka/clients/producer/BufferPoolTest.java
new file mode 100644
index 0000000..70603c4
--- /dev/null
+++ b/clients/src/test/java/kafka/clients/producer/BufferPoolTest.java
@@ -0,0 +1,170 @@
+package kafka.clients.producer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import kafka.clients.producer.internals.BufferPool;
+import kafka.test.TestUtils;
+
+import org.junit.Test;
+
+public class BufferPoolTest {
+
+ /**
+ * Test the simple non-blocking allocation paths
+ */
+ @Test
+ public void testSimple() throws Exception {
+ int totalMemory = 64 * 1024;
+ int size = 1024;
+ BufferPool pool = new BufferPool(totalMemory, size, false);
+ ByteBuffer buffer = pool.allocate(size);
+ assertEquals("Buffer size should equal requested size.", size, buffer.limit());
+ assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory());
+ assertEquals("Available memory should have shrunk", totalMemory - size, pool.availableMemory());
+ buffer.putInt(1);
+ buffer.flip();
+ pool.deallocate(buffer);
+ assertEquals("All memory should be available", totalMemory, pool.availableMemory());
+ assertEquals("But now some is on the free list", totalMemory - size, pool.unallocatedMemory());
+ buffer = pool.allocate(size);
+ assertEquals("Recycled buffer should be cleared.", 0, buffer.position());
+ assertEquals("Recycled buffer should be cleared.", buffer.capacity(), buffer.limit());
+ pool.deallocate(buffer);
+ assertEquals("All memory should be available", totalMemory, pool.availableMemory());
+ assertEquals("Still a single buffer on the free list", totalMemory - size, pool.unallocatedMemory());
+ buffer = pool.allocate(2 * size);
+ pool.deallocate(buffer);
+ assertEquals("All memory should be available", totalMemory, pool.availableMemory());
+ assertEquals("Non-standard size didn't go to the free list.", totalMemory - size, pool.unallocatedMemory());
+ }
+
+ /**
+ * Test that we cannot try to allocate more memory then we have in the whole pool
+ */
+ @Test(expected = IllegalArgumentException.class)
+ public void testCantAllocateMoreMemoryThanWeHave() throws Exception {
+ BufferPool pool = new BufferPool(1024, 512, true);
+ ByteBuffer buffer = pool.allocate(1024);
+ assertEquals(1024, buffer.limit());
+ pool.deallocate(buffer);
+ buffer = pool.allocate(1025);
+ }
+
+ @Test
+ public void testNonblockingMode() throws Exception {
+ BufferPool pool = new BufferPool(2, 1, false);
+ pool.allocate(1);
+ try {
+ pool.allocate(2);
+ fail("The buffer allocated more than it has!");
+ } catch (BufferExhaustedException e) {
+ // this is good
+ }
+ }
+
+ /**
+ * Test that delayed allocation blocks
+ */
+ @Test
+ public void testDelayedAllocation() throws Exception {
+ BufferPool pool = new BufferPool(5 * 1024, 1024, true);
+ ByteBuffer buffer = pool.allocate(1024);
+ CountDownLatch doDealloc = asyncDeallocate(pool, buffer);
+ CountDownLatch allocation = asyncAllocate(pool, 5 * 1024);
+ assertEquals("Allocation shouldn't have happened yet, waiting on memory.", 1, allocation.getCount());
+ doDealloc.countDown(); // return the memory
+ allocation.await();
+ }
+
+ private CountDownLatch asyncDeallocate(final BufferPool pool, final ByteBuffer buffer) {
+ final CountDownLatch latch = new CountDownLatch(1);
+ new Thread() {
+ public void run() {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ pool.deallocate(buffer);
+ }
+ }.start();
+ return latch;
+ }
+
+ private CountDownLatch asyncAllocate(final BufferPool pool, final int size) {
+ final CountDownLatch completed = new CountDownLatch(1);
+ new Thread() {
+ public void run() {
+ try {
+ pool.allocate(size);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ completed.countDown();
+ }
+ }
+ }.start();
+ return completed;
+ }
+
+ /**
+ * This test creates lots of threads that hammer on the pool
+ */
+ @Test
+ public void testStressfulSituation() throws Exception {
+ int numThreads = 10;
+ final int iterations = 50000;
+ final int poolableSize = 1024;
+ final int totalMemory = numThreads / 2 * poolableSize;
+ final BufferPool pool = new BufferPool(totalMemory, poolableSize, true);
+ List<StressTestThread> threads = new ArrayList<StressTestThread>();
+ for (int i = 0; i < numThreads; i++)
+ threads.add(new StressTestThread(pool, iterations));
+ for (StressTestThread thread : threads)
+ thread.start();
+ for (StressTestThread thread : threads)
+ thread.join();
+ for (StressTestThread thread : threads)
+ assertTrue("Thread should have completed all iterations successfully.", thread.success.get());
+ assertEquals(totalMemory, pool.availableMemory());
+ }
+
+ public static class StressTestThread extends Thread {
+ private final int iterations;
+ private final BufferPool pool;
+ public final AtomicBoolean success = new AtomicBoolean(false);
+
+ public StressTestThread(BufferPool pool, int iterations) {
+ this.iterations = iterations;
+ this.pool = pool;
+ }
+
+ public void run() {
+ try {
+ for (int i = 0; i < iterations; i++) {
+ int size;
+ if (TestUtils.random.nextBoolean())
+ // allocate poolable size
+ size = pool.poolableSize();
+ else
+ // allocate a random size
+ size = TestUtils.random.nextInt((int) pool.totalMemory());
+ ByteBuffer buffer = pool.allocate(size);
+ pool.deallocate(buffer);
+ }
+ success.set(true);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/clients/producer/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/kafka/clients/producer/MetadataTest.java
new file mode 100644
index 0000000..68e4bd7
--- /dev/null
+++ b/clients/src/test/java/kafka/clients/producer/MetadataTest.java
@@ -0,0 +1,55 @@
+package kafka.clients.producer;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import kafka.clients.producer.internals.Metadata;
+import kafka.common.Cluster;
+import kafka.common.Node;
+import kafka.common.PartitionInfo;
+
+import org.junit.Test;
+
+public class MetadataTest {
+
+ private long refreshBackoffMs = 100;
+ private long metadataExpireMs = 1000;
+ private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs);
+
+ @Test
+ public void testMetadata() throws Exception {
+ long time = 0;
+ metadata.update(Cluster.empty(), time);
+ assertFalse("No update needed.", metadata.needsUpdate(time));
+ metadata.forceUpdate();
+ assertFalse("Still no updated needed due to backoff", metadata.needsUpdate(time));
+ time += refreshBackoffMs;
+ assertTrue("Update needed now that backoff time expired", metadata.needsUpdate(time));
+ String topic = "my-topic";
+ Thread t1 = asyncFetch(topic);
+ Thread t2 = asyncFetch(topic);
+ assertTrue("Awaiting update", t1.isAlive());
+ assertTrue("Awaiting update", t2.isAlive());
+ metadata.update(clusterWith(topic), time);
+ t1.join();
+ t2.join();
+ assertFalse("No update needed.", metadata.needsUpdate(time));
+ time += metadataExpireMs;
+ assertTrue("Update needed due to stale metadata.", metadata.needsUpdate(time));
+ }
+
+ private Cluster clusterWith(String topic) {
+ return new Cluster(asList(new Node(0, "localhost", 1969)), asList(new PartitionInfo(topic, 0, 0, new int[0], new int[0])));
+ }
+
+ private Thread asyncFetch(final String topic) {
+ Thread thread = new Thread() {
+ public void run() {
+ metadata.fetch(topic, Integer.MAX_VALUE);
+ }
+ };
+ thread.start();
+ return thread;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/kafka/clients/producer/MockProducerTest.java
new file mode 100644
index 0000000..61929a4
--- /dev/null
+++ b/clients/src/test/java/kafka/clients/producer/MockProducerTest.java
@@ -0,0 +1,66 @@
+package kafka.clients.producer;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import kafka.common.Cluster;
+import kafka.common.Node;
+import kafka.common.PartitionInfo;
+import kafka.common.Serializer;
+import kafka.common.StringSerialization;
+
+import org.junit.Test;
+
+public class MockProducerTest {
+
+ @Test
+ public void testAutoCompleteMock() {
+ MockProducer producer = new MockProducer(true);
+ ProducerRecord record = new ProducerRecord("topic", "key", "value");
+ RecordSend send = producer.send(record);
+ assertTrue("Send should be immediately complete", send.completed());
+ assertFalse("Send should be successful", send.hasError());
+ assertEquals("Offset should be 0", 0, send.offset());
+ assertEquals("We should have the record in our history", asList(record), producer.history());
+ producer.clear();
+ assertEquals("Clear should erase our history", 0, producer.history().size());
+ }
+
+ public void testManualCompletion() {
+ MockProducer producer = new MockProducer(false);
+ ProducerRecord record1 = new ProducerRecord("topic", "key1", "value1");
+ ProducerRecord record2 = new ProducerRecord("topic", "key2", "value2");
+ RecordSend send1 = producer.send(record1);
+ assertFalse("Send shouldn't have completed", send1.completed());
+ RecordSend send2 = producer.send(record2);
+ assertFalse("Send shouldn't have completed", send2.completed());
+ assertTrue("Complete the first request", producer.completeNext());
+ assertFalse("Requst should be successful", send1.hasError());
+ assertFalse("Second request still incomplete", send2.completed());
+ IllegalArgumentException e = new IllegalArgumentException("blah");
+ assertTrue("Complete the second request with an error", producer.errorNext(e));
+ try {
+ send2.await();
+ fail("Expected error to be thrown");
+ } catch (IllegalArgumentException err) {
+ // this is good
+ }
+ assertFalse("No more requests to complete", producer.completeNext());
+ }
+
+ public void testSerializationAndPartitioning() {
+ Cluster cluster = new Cluster(asList(new Node(0, "host", -1)), asList(new PartitionInfo("topic",
+ 0,
+ 0,
+ new int[] { 0 },
+ new int[] { 0 })));
+ Serializer serializer = new StringSerialization();
+ Partitioner partitioner = new DefaultPartitioner();
+ MockProducer producer = new MockProducer(serializer, serializer, partitioner, cluster, true);
+ ProducerRecord record = new ProducerRecord("topic", "key", "value");
+ RecordSend send = producer.send(record);
+ assertTrue("Send should be immediately complete", send.completed());
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java
new file mode 100644
index 0000000..b1ab361
--- /dev/null
+++ b/clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java
@@ -0,0 +1,135 @@
+package kafka.clients.producer;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import kafka.clients.producer.internals.RecordAccumulator;
+import kafka.clients.producer.internals.RecordBatch;
+import kafka.common.TopicPartition;
+import kafka.common.metrics.Metrics;
+import kafka.common.record.CompressionType;
+import kafka.common.record.LogEntry;
+import kafka.common.record.Record;
+import kafka.common.record.Records;
+import kafka.common.utils.MockTime;
+
+import org.junit.Test;
+
+public class RecordAccumulatorTest {
+
+ private TopicPartition tp = new TopicPartition("test", 0);
+ private MockTime time = new MockTime();
+ private byte[] key = "key".getBytes();
+ private byte[] value = "value".getBytes();
+ private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
+ private Metrics metrics = new Metrics(time);
+
+ @Test
+ public void testFull() throws Exception {
+ long now = time.milliseconds();
+ RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, false, metrics, time);
+ int appends = 1024 / msgSize;
+ for (int i = 0; i < appends; i++) {
+ accum.append(tp, key, value, CompressionType.NONE, null);
+ assertEquals("No partitions should be ready.", 0, accum.ready(now).size());
+ }
+ accum.append(tp, key, value, CompressionType.NONE, null);
+ assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
+ List<RecordBatch> batches = accum.drain(asList(tp), Integer.MAX_VALUE);
+ assertEquals(1, batches.size());
+ RecordBatch batch = batches.get(0);
+ Iterator<LogEntry> iter = batch.records.iterator();
+ for (int i = 0; i < appends; i++) {
+ LogEntry entry = iter.next();
+ assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
+ assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value());
+ }
+ assertFalse("No more records", iter.hasNext());
+ }
+
+ @Test
+ public void testAppendLarge() throws Exception {
+ int batchSize = 512;
+ RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, false, metrics, time);
+ accum.append(tp, key, new byte[2 * batchSize], CompressionType.NONE, null);
+ assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
+ }
+
+ @Test
+ public void testLinger() throws Exception {
+ long lingerMs = 10L;
+ RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, false, metrics, time);
+ accum.append(tp, key, value, CompressionType.NONE, null);
+ assertEquals("No partitions should be ready", 0, accum.ready(time.milliseconds()).size());
+ time.sleep(10);
+ assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
+ List<RecordBatch> batches = accum.drain(asList(tp), Integer.MAX_VALUE);
+ assertEquals(1, batches.size());
+ RecordBatch batch = batches.get(0);
+ Iterator<LogEntry> iter = batch.records.iterator();
+ LogEntry entry = iter.next();
+ assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
+ assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value());
+ assertFalse("No more records", iter.hasNext());
+ }
+
+ @Test
+ public void testPartialDrain() throws Exception {
+ RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, false, metrics, time);
+ int appends = 1024 / msgSize + 1;
+ List<TopicPartition> partitions = asList(new TopicPartition("test", 0), new TopicPartition("test", 1));
+ for (TopicPartition tp : partitions) {
+ for (int i = 0; i < appends; i++)
+ accum.append(tp, key, value, CompressionType.NONE, null);
+ }
+ assertEquals("Both partitions should be ready", 2, accum.ready(time.milliseconds()).size());
+
+ List<RecordBatch> batches = accum.drain(partitions, 1024);
+ assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size());
+ }
+
+ @Test
+ public void testStressfulSituation() throws Exception {
+ final int numThreads = 5;
+ final int msgs = 10000;
+ final int numParts = 10;
+ final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, true, metrics, time);
+ List<Thread> threads = new ArrayList<Thread>();
+ for (int i = 0; i < numThreads; i++) {
+ threads.add(new Thread() {
+ public void run() {
+ for (int i = 0; i < msgs; i++) {
+ try {
+ accum.append(new TopicPartition("test", i % numParts), key, value, CompressionType.NONE, null);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ });
+ }
+ for (Thread t : threads)
+ t.start();
+ int read = 0;
+ long now = time.milliseconds();
+ while (read < numThreads * msgs) {
+ List<TopicPartition> tps = accum.ready(now);
+ List<RecordBatch> batches = accum.drain(tps, 5 * 1024);
+ for (RecordBatch batch : batches) {
+ for (LogEntry entry : batch.records)
+ read++;
+ }
+ accum.deallocate(batches);
+ }
+
+ for (Thread t : threads)
+ t.join();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/clients/producer/RecordSendTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/RecordSendTest.java b/clients/src/test/java/kafka/clients/producer/RecordSendTest.java
new file mode 100644
index 0000000..f8fd14b
--- /dev/null
+++ b/clients/src/test/java/kafka/clients/producer/RecordSendTest.java
@@ -0,0 +1,76 @@
+package kafka.clients.producer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.TimeUnit;
+
+import kafka.clients.producer.internals.ProduceRequestResult;
+import kafka.common.TopicPartition;
+import kafka.common.errors.CorruptMessageException;
+import kafka.common.errors.TimeoutException;
+
+import org.junit.Test;
+
+public class RecordSendTest {
+
+ private TopicPartition topicPartition = new TopicPartition("test", 0);
+ private long baseOffset = 45;
+ private long relOffset = 5;
+
+ /**
+ * Test that waiting on a request that never completes times out
+ */
+ @Test
+ public void testTimeout() {
+ ProduceRequestResult request = new ProduceRequestResult();
+ RecordSend send = new RecordSend(relOffset, request);
+ assertFalse("Request is not completed", send.completed());
+ try {
+ send.await(5, TimeUnit.MILLISECONDS);
+ fail("Should have thrown exception.");
+ } catch (TimeoutException e) { /* this is good */
+ }
+
+ request.done(topicPartition, baseOffset, null);
+ assertTrue(send.completed());
+ assertEquals(baseOffset + relOffset, send.offset());
+ }
+
+ /**
+ * Test that an asynchronous request will eventually throw the right exception
+ */
+ @Test(expected = CorruptMessageException.class)
+ public void testError() {
+ RecordSend send = new RecordSend(relOffset, asyncRequest(baseOffset, new CorruptMessageException(), 50L));
+ send.await();
+ }
+
+ /**
+ * Test that an asynchronous request will eventually return the right offset
+ */
+ @Test
+ public void testBlocking() {
+ RecordSend send = new RecordSend(relOffset, asyncRequest(baseOffset, null, 50L));
+ assertEquals(baseOffset + relOffset, send.offset());
+ }
+
+ /* create a new request result that will be completed after the given timeout */
+ public ProduceRequestResult asyncRequest(final long baseOffset, final RuntimeException error, final long timeout) {
+ final ProduceRequestResult request = new ProduceRequestResult();
+ new Thread() {
+ public void run() {
+ try {
+ sleep(timeout);
+ request.done(topicPartition, baseOffset, error);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }.start();
+ return request;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/clients/producer/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/SenderTest.java b/clients/src/test/java/kafka/clients/producer/SenderTest.java
new file mode 100644
index 0000000..73f1aba
--- /dev/null
+++ b/clients/src/test/java/kafka/clients/producer/SenderTest.java
@@ -0,0 +1,92 @@
+package kafka.clients.producer;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+
+import kafka.clients.producer.internals.Metadata;
+import kafka.clients.producer.internals.RecordAccumulator;
+import kafka.clients.producer.internals.Sender;
+import kafka.common.Cluster;
+import kafka.common.Node;
+import kafka.common.PartitionInfo;
+import kafka.common.TopicPartition;
+import kafka.common.metrics.Metrics;
+import kafka.common.network.NetworkReceive;
+import kafka.common.protocol.ApiKeys;
+import kafka.common.protocol.Errors;
+import kafka.common.protocol.ProtoUtils;
+import kafka.common.protocol.types.Struct;
+import kafka.common.record.CompressionType;
+import kafka.common.requests.RequestSend;
+import kafka.common.requests.ResponseHeader;
+import kafka.common.utils.MockTime;
+import kafka.test.MockSelector;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class SenderTest {
+
+ private MockTime time = new MockTime();
+ private MockSelector selector = new MockSelector(time);
+ private int batchSize = 16 * 1024;
+ private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+ private Cluster cluster = new Cluster(asList(new Node(0, "localhost", 1969)), asList(new PartitionInfo("test",
+ 0,
+ 0,
+ new int[0],
+ new int[0])));
+ private Metrics metrics = new Metrics(time);
+ private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, false, metrics, time);
+ private Sender sender = new Sender(selector, metadata, this.accumulator, "", 1024 * 1024, 0L, (short) -1, 10000, time);
+
+ @Before
+ public void setup() {
+ metadata.update(cluster, time.milliseconds());
+ }
+
+ @Test
+ public void testSimple() throws Exception {
+ TopicPartition tp = new TopicPartition("test", 0);
+ RecordSend send = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
+ sender.run(time.milliseconds());
+ assertEquals("We should have connected", 1, selector.connected().size());
+ selector.clear();
+ sender.run(time.milliseconds());
+ assertEquals("Single request should be sent", 1, selector.completedSends().size());
+ RequestSend request = (RequestSend) selector.completedSends().get(0);
+ selector.clear();
+ long offset = 42;
+ selector.completeReceive(produceResponse(request.header().correlationId(),
+ cluster.leaderFor(tp).id(),
+ tp.topic(),
+ tp.partition(),
+ offset,
+ Errors.NONE.code()));
+ sender.run(time.milliseconds());
+ assertTrue("Request should be completed", send.completed());
+ assertEquals(offset, send.offset());
+ }
+
+ private NetworkReceive produceResponse(int correlation, int source, String topic, int part, long offset, int error) {
+ Struct struct = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id));
+ Struct response = struct.instance("responses");
+ response.set("topic", topic);
+ Struct partResp = response.instance("partition_responses");
+ partResp.set("partition", part);
+ partResp.set("error_code", (short) error);
+ partResp.set("base_offset", offset);
+ response.set("partition_responses", new Object[] { partResp });
+ struct.set("responses", new Object[] { response });
+ ResponseHeader header = new ResponseHeader(correlation);
+ ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + struct.sizeOf());
+ header.writeTo(buffer);
+ struct.writeTo(buffer);
+ buffer.rewind();
+ return new NetworkReceive(source, buffer);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/common/config/ConfigDefTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/kafka/common/config/ConfigDefTest.java
new file mode 100644
index 0000000..a6a91ac
--- /dev/null
+++ b/clients/src/test/java/kafka/common/config/ConfigDefTest.java
@@ -0,0 +1,88 @@
+package kafka.common.config;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import kafka.common.config.ConfigDef.Range;
+import kafka.common.config.ConfigDef.Type;
+
+import org.junit.Test;
+
+public class ConfigDefTest {
+
+ @Test
+ public void testBasicTypes() {
+ ConfigDef def = new ConfigDef().define("a", Type.INT, 5, Range.between(0, 14), "docs")
+ .define("b", Type.LONG, "docs")
+ .define("c", Type.STRING, "hello", "docs")
+ .define("d", Type.LIST, "docs")
+ .define("e", Type.DOUBLE, "docs")
+ .define("f", Type.CLASS, "docs")
+ .define("g", Type.BOOLEAN, "docs");
+
+ Properties props = new Properties();
+ props.put("a", "1 ");
+ props.put("b", 2);
+ props.put("d", " a , b, c");
+ props.put("e", 42.5d);
+ props.put("f", String.class.getName());
+ props.put("g", "true");
+
+ Map<String, Object> vals = def.parse(props);
+ assertEquals(1, vals.get("a"));
+ assertEquals(2L, vals.get("b"));
+ assertEquals("hello", vals.get("c"));
+ assertEquals(asList("a", "b", "c"), vals.get("d"));
+ assertEquals(42.5d, vals.get("e"));
+ assertEquals(String.class, vals.get("f"));
+ assertEquals(true, vals.get("g"));
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testInvalidDefault() {
+ new ConfigDef().define("a", Type.INT, "hello", "docs");
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testNullDefault() {
+ new ConfigDef().define("a", Type.INT, null, null, "docs");
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testMissingRequired() {
+ new ConfigDef().define("a", Type.INT, "docs").parse(new HashMap<String, Object>());
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testDefinedTwice() {
+ new ConfigDef().define("a", Type.STRING, "docs").define("a", Type.INT, "docs");
+ }
+
+ @Test
+ public void testBadInputs() {
+ testBadInputs(Type.INT, "hello", null, "42.5", 42.5, Long.MAX_VALUE, Long.toString(Long.MAX_VALUE), new Object());
+ testBadInputs(Type.LONG, "hello", null, "42.5", Long.toString(Long.MAX_VALUE) + "00", new Object());
+ testBadInputs(Type.DOUBLE, "hello", null, new Object());
+ testBadInputs(Type.STRING, new Object());
+ testBadInputs(Type.LIST, 53, new Object());
+ }
+
+ private void testBadInputs(Type type, Object... values) {
+ for (Object value : values) {
+ Map<String, Object> m = new HashMap<String, Object>();
+ m.put("name", value);
+ ConfigDef def = new ConfigDef().define("name", type, "docs");
+ try {
+ def.parse(m);
+ fail("Expected a config exception on bad input for value " + value);
+ } catch (ConfigException e) {
+ // this is good
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/common/metrics/JmxReporterTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/metrics/JmxReporterTest.java b/clients/src/test/java/kafka/common/metrics/JmxReporterTest.java
new file mode 100644
index 0000000..e286261
--- /dev/null
+++ b/clients/src/test/java/kafka/common/metrics/JmxReporterTest.java
@@ -0,0 +1,21 @@
+package kafka.common.metrics;
+
+import kafka.common.metrics.stats.Avg;
+import kafka.common.metrics.stats.Total;
+
+import org.junit.Test;
+
+public class JmxReporterTest {
+
+ @Test
+ public void testJmxRegistration() throws Exception {
+ Metrics metrics = new Metrics();
+ metrics.addReporter(new JmxReporter());
+ Sensor sensor = metrics.sensor("kafka.requests");
+ sensor.add("pack.bean1.avg", new Avg());
+ sensor.add("pack.bean2.total", new Total());
+ Sensor sensor2 = metrics.sensor("kafka.blah");
+ sensor2.add("pack.bean1.some", new Total());
+ sensor2.add("pack.bean2.some", new Total());
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/kafka/common/metrics/MetricsTest.java
new file mode 100644
index 0000000..7d06864
--- /dev/null
+++ b/clients/src/test/java/kafka/common/metrics/MetricsTest.java
@@ -0,0 +1,176 @@
+package kafka.common.metrics;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import kafka.common.metrics.stats.Avg;
+import kafka.common.metrics.stats.Count;
+import kafka.common.metrics.stats.Max;
+import kafka.common.metrics.stats.Min;
+import kafka.common.metrics.stats.Percentile;
+import kafka.common.metrics.stats.Percentiles;
+import kafka.common.metrics.stats.Percentiles.BucketSizing;
+import kafka.common.metrics.stats.Rate;
+import kafka.common.metrics.stats.Total;
+import kafka.common.utils.MockTime;
+
+import org.junit.Test;
+
+public class MetricsTest {
+
+ private static double EPS = 0.000001;
+
+ MockTime time = new MockTime();
+ Metrics metrics = new Metrics(new MetricConfig(), Arrays.asList((MetricsReporter) new JmxReporter()), time);
+
+ @Test
+ public void testSimpleStats() throws Exception {
+ ConstantMeasurable measurable = new ConstantMeasurable();
+ metrics.addMetric("direct.measurable", measurable);
+ Sensor s = metrics.sensor("test.sensor");
+ s.add("test.avg", new Avg());
+ s.add("test.max", new Max());
+ s.add("test.min", new Min());
+ s.add("test.rate", new Rate(TimeUnit.SECONDS));
+ s.add("test.occurences", new Rate(TimeUnit.SECONDS, new Count()));
+ s.add("test.count", new Count());
+ s.add(new Percentiles(100, -100, 100, BucketSizing.CONSTANT, new Percentile("test.median", 50.0), new Percentile("test.perc99_9",
+ 99.9)));
+
+ Sensor s2 = metrics.sensor("test.sensor2");
+ s2.add("s2.total", new Total());
+ s2.record(5.0);
+
+ for (int i = 0; i < 10; i++)
+ s.record(i);
+
+ // pretend 2 seconds passed...
+ time.sleep(2000);
+
+ assertEquals("s2 reflects the constant value", 5.0, metrics.metrics().get("s2.total").value(), EPS);
+ assertEquals("Avg(0...9) = 4.5", 4.5, metrics.metrics().get("test.avg").value(), EPS);
+ assertEquals("Max(0...9) = 9", 9.0, metrics.metrics().get("test.max").value(), EPS);
+ assertEquals("Min(0...9) = 0", 0.0, metrics.metrics().get("test.min").value(), EPS);
+ assertEquals("Rate(0...9) = 22.5", 22.5, metrics.metrics().get("test.rate").value(), EPS);
+ assertEquals("Occurences(0...9) = 5", 5.0, metrics.metrics().get("test.occurences").value(), EPS);
+ assertEquals("Count(0...9) = 10", 10.0, metrics.metrics().get("test.count").value(), EPS);
+ }
+
+ @Test
+ public void testHierarchicalSensors() {
+ Sensor parent1 = metrics.sensor("test.parent1");
+ parent1.add("test.parent1.count", new Count());
+ Sensor parent2 = metrics.sensor("test.parent2");
+ parent2.add("test.parent2.count", new Count());
+ Sensor child1 = metrics.sensor("test.child1", parent1, parent2);
+ child1.add("test.child1.count", new Count());
+ Sensor child2 = metrics.sensor("test.child2", parent1);
+ child2.add("test.child2.count", new Count());
+ Sensor grandchild = metrics.sensor("test.grandchild", child1);
+ grandchild.add("test.grandchild.count", new Count());
+
+ /* increment each sensor one time */
+ parent1.record();
+ parent2.record();
+ child1.record();
+ child2.record();
+ grandchild.record();
+
+ double p1 = parent1.metrics().get(0).value();
+ double p2 = parent2.metrics().get(0).value();
+ double c1 = child1.metrics().get(0).value();
+ double c2 = child2.metrics().get(0).value();
+ double gc = grandchild.metrics().get(0).value();
+
+ /* each metric should have a count equal to one + its children's count */
+ assertEquals(1.0, gc, EPS);
+ assertEquals(1.0 + gc, child1.metrics().get(0).value(), EPS);
+ assertEquals(1.0, c2, EPS);
+ assertEquals(1.0 + c1, p2, EPS);
+ assertEquals(1.0 + c1 + c2, p1, EPS);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBadSensorHiearchy() {
+ Sensor p = metrics.sensor("parent");
+ Sensor c1 = metrics.sensor("child1", p);
+ Sensor c2 = metrics.sensor("child2", p);
+ metrics.sensor("gc", c1, c2); // should fail
+ }
+
+ @Test
+ public void testEventWindowing() {
+ Count count = new Count();
+ MetricConfig config = new MetricConfig().eventWindow(1).samples(2);
+ count.record(config, 1.0, time.nanoseconds());
+ count.record(config, 1.0, time.nanoseconds());
+ assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS);
+ count.record(config, 1.0, time.nanoseconds()); // first event times out
+ assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS);
+ }
+
+ @Test
+ public void testTimeWindowing() {
+ Count count = new Count();
+ MetricConfig config = new MetricConfig().timeWindow(1, TimeUnit.MILLISECONDS).samples(2);
+ count.record(config, 1.0, time.nanoseconds());
+ time.sleep(1);
+ count.record(config, 1.0, time.nanoseconds());
+ assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS);
+ time.sleep(1);
+ count.record(config, 1.0, time.nanoseconds()); // oldest event times out
+ assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS);
+ }
+
+ @Test
+ public void testOldDataHasNoEffect() {
+ Max max = new Max();
+ long windowMs = 100;
+ MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS);
+ max.record(config, 50, time.nanoseconds());
+ time.sleep(windowMs);
+ assertEquals(Double.NEGATIVE_INFINITY, max.measure(config, time.nanoseconds()), EPS);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDuplicateMetricName() {
+ metrics.sensor("test").add("test", new Avg());
+ metrics.sensor("test2").add("test", new Total());
+ }
+
+ @Test
+ public void testQuotas() {
+ Sensor sensor = metrics.sensor("test");
+ sensor.add("test1.total", new Total(), new MetricConfig().quota(Quota.lessThan(5.0)));
+ sensor.add("test2.total", new Total(), new MetricConfig().quota(Quota.moreThan(0.0)));
+ sensor.record(5.0);
+ try {
+ sensor.record(1.0);
+ fail("Should have gotten a quota violation.");
+ } catch (QuotaViolationException e) {
+ // this is good
+ }
+ assertEquals(6.0, metrics.metrics().get("test1.total").value(), EPS);
+ sensor.record(-6.0);
+ try {
+ sensor.record(-1.0);
+ fail("Should have gotten a quota violation.");
+ } catch (QuotaViolationException e) {
+ // this is good
+ }
+ }
+
+ public static class ConstantMeasurable implements Measurable {
+ public double value = 0.0;
+
+ @Override
+ public double measure(MetricConfig config, long now) {
+ return value;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java b/clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java
new file mode 100644
index 0000000..03bdd2b
--- /dev/null
+++ b/clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java
@@ -0,0 +1,56 @@
+package kafka.common.metrics.stats;
+
+import static org.junit.Assert.assertEquals;
+import kafka.common.metrics.stats.Histogram.BinScheme;
+import kafka.common.metrics.stats.Histogram.ConstantBinScheme;
+import kafka.common.metrics.stats.Histogram.LinearBinScheme;
+
+import org.junit.Test;
+
+public class HistogramTest {
+
+ private static final double EPS = 0.0000001d;
+
+ // @Test
+ public void testHistogram() {
+ BinScheme scheme = new ConstantBinScheme(12, -5, 5);
+ Histogram hist = new Histogram(scheme);
+ for (int i = -5; i < 5; i++)
+ hist.record(i);
+ for (int i = 0; i < 10; i++)
+ assertEquals(scheme.fromBin(i + 1), hist.value(i / 10.0), EPS);
+ }
+
+ @Test
+ public void testConstantBinScheme() {
+ ConstantBinScheme scheme = new ConstantBinScheme(5, -5, 5);
+ assertEquals("A value below the lower bound should map to the first bin", 0, scheme.toBin(-5.01));
+ assertEquals("A value above the upper bound should map to the last bin", 4, scheme.toBin(5.01));
+ assertEquals("Check boundary of bucket 1", 1, scheme.toBin(-5));
+ assertEquals("Check boundary of bucket 4", 4, scheme.toBin(5));
+ assertEquals("Check boundary of bucket 3", 3, scheme.toBin(4.9999));
+ checkBinningConsistency(new ConstantBinScheme(4, 0, 5));
+ checkBinningConsistency(scheme);
+ }
+
+ public void testLinearBinScheme() {
+ LinearBinScheme scheme = new LinearBinScheme(5, 5);
+ for (int i = 0; i < scheme.bins(); i++)
+ System.out.println(i + " " + scheme.fromBin(i));
+ checkBinningConsistency(scheme);
+ }
+
+ private void checkBinningConsistency(BinScheme scheme) {
+ for (int bin = 0; bin < scheme.bins(); bin++) {
+ double fromBin = scheme.fromBin(bin);
+ int binAgain = scheme.toBin(fromBin);
+ assertEquals("unbinning and rebinning " + bin
+ + " gave a different result ("
+ + fromBin
+ + " was placed in bin "
+ + binAgain
+ + " )", bin, binAgain);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java
new file mode 100644
index 0000000..5204f3a
--- /dev/null
+++ b/clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java
@@ -0,0 +1,96 @@
+package kafka.common.protocol.types;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class ProtocolSerializationTest {
+
+ private Schema schema;
+ private Struct struct;
+
+ @Before
+ public void setup() {
+ this.schema = new Schema(new Field("int8", Type.INT8),
+ new Field("int16", Type.INT16),
+ new Field("int32", Type.INT32),
+ new Field("int64", Type.INT64),
+ new Field("string", Type.STRING),
+ new Field("bytes", Type.BYTES),
+ new Field("array", new ArrayOf(Type.INT32)),
+ new Field("struct", new Schema(new Field("field", Type.INT32))));
+ this.struct = new Struct(this.schema).set("int8", (byte) 1)
+ .set("int16", (short) 1)
+ .set("int32", (int) 1)
+ .set("int64", (long) 1)
+ .set("string", "1")
+ .set("bytes", "1".getBytes())
+ .set("array", new Object[] { 1 });
+ this.struct.set("struct", this.struct.instance("struct").set("field", new Object[] { 1, 2, 3 }));
+ }
+
+ @Test
+ public void testSimple() {
+ check(Type.INT8, (byte) -111);
+ check(Type.INT16, (short) -11111);
+ check(Type.INT32, -11111111);
+ check(Type.INT64, -11111111111L);
+ check(Type.STRING, "");
+ check(Type.STRING, "hello");
+ check(Type.STRING, "A\u00ea\u00f1\u00fcC");
+ check(Type.BYTES, ByteBuffer.allocate(0));
+ check(Type.BYTES, ByteBuffer.wrap("abcd".getBytes()));
+ check(new ArrayOf(Type.INT32), new Object[] { 1, 2, 3, 4 });
+ check(new ArrayOf(Type.STRING), new Object[] {});
+ check(new ArrayOf(Type.STRING), new Object[] { "hello", "there", "beautiful" });
+ }
+
+ @Test
+ public void testNulls() {
+ for (Field f : this.schema.fields()) {
+ Object o = this.struct.get(f);
+ try {
+ this.struct.set(f, null);
+ this.struct.validate();
+ fail("Should not allow serialization of null value.");
+ } catch (SchemaException e) {
+ // this is good
+ this.struct.set(f, o);
+ }
+ }
+ }
+
+ @Test
+ public void testDefault() {
+ Schema schema = new Schema(new Field("field", Type.INT32, "doc", 42));
+ Struct struct = new Struct(schema);
+ assertEquals("Should get the default value", 42, struct.get("field"));
+ struct.validate(); // should be valid even with missing value
+ }
+
+ private Object roundtrip(Type type, Object obj) {
+ ByteBuffer buffer = ByteBuffer.allocate(type.sizeOf(obj));
+ type.write(buffer, obj);
+ assertFalse("The buffer should now be full.", buffer.hasRemaining());
+ buffer.rewind();
+ Object read = type.read(buffer);
+ assertFalse("All bytes should have been read.", buffer.hasRemaining());
+ return read;
+ }
+
+ private void check(Type type, Object obj) {
+ Object result = roundtrip(type, obj);
+ if (obj instanceof Object[]) {
+ obj = Arrays.asList((Object[]) obj);
+ result = Arrays.asList((Object[]) result);
+ }
+ assertEquals("The object read back should be the same as what was written.", obj, result);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/kafka/common/record/MemoryRecordsTest.java
new file mode 100644
index 0000000..6906309
--- /dev/null
+++ b/clients/src/test/java/kafka/common/record/MemoryRecordsTest.java
@@ -0,0 +1,44 @@
+package kafka.common.record;
+
+import static kafka.common.utils.Utils.toArray;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Test;
+
+public class MemoryRecordsTest {
+
+ @Test
+ public void testIterator() {
+ MemoryRecords recs1 = new MemoryRecords(ByteBuffer.allocate(1024));
+ MemoryRecords recs2 = new MemoryRecords(ByteBuffer.allocate(1024));
+ List<Record> list = Arrays.asList(new Record("a".getBytes(), "1".getBytes()),
+ new Record("b".getBytes(), "2".getBytes()),
+ new Record("c".getBytes(), "3".getBytes()));
+ for (int i = 0; i < list.size(); i++) {
+ Record r = list.get(i);
+ recs1.append(i, r);
+ recs2.append(i, toArray(r.key()), toArray(r.value()), r.compressionType());
+ }
+
+ for (int iteration = 0; iteration < 2; iteration++) {
+ for (MemoryRecords recs : Arrays.asList(recs1, recs2)) {
+ Iterator<LogEntry> iter = recs.iterator();
+ for (int i = 0; i < list.size(); i++) {
+ assertTrue(iter.hasNext());
+ LogEntry entry = iter.next();
+ assertEquals((long) i, entry.offset());
+ assertEquals(list.get(i), entry.record());
+ }
+ assertFalse(iter.hasNext());
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/common/record/RecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/record/RecordTest.java b/clients/src/test/java/kafka/common/record/RecordTest.java
new file mode 100644
index 0000000..9c59c9b
--- /dev/null
+++ b/clients/src/test/java/kafka/common/record/RecordTest.java
@@ -0,0 +1,87 @@
+package kafka.common.record;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(value = Parameterized.class)
+public class RecordTest {
+
+ private ByteBuffer key;
+ private ByteBuffer value;
+ private CompressionType compression;
+ private Record record;
+
+ public RecordTest(byte[] key, byte[] value, CompressionType compression) {
+ this.key = key == null ? null : ByteBuffer.wrap(key);
+ this.value = value == null ? null : ByteBuffer.wrap(value);
+ this.compression = compression;
+ this.record = new Record(key, value, compression);
+ }
+
+ @Test
+ public void testFields() {
+ assertEquals(compression, record.compressionType());
+ assertEquals(key != null, record.hasKey());
+ assertEquals(key, record.key());
+ if (key != null)
+ assertEquals(key.limit(), record.keySize());
+ assertEquals(Record.CURRENT_MAGIC_VALUE, record.magic());
+ assertEquals(value, record.value());
+ if (value != null)
+ assertEquals(value.limit(), record.valueSize());
+ }
+
+ @Test
+ public void testChecksum() {
+ assertEquals(record.checksum(), record.computeChecksum());
+ assertTrue(record.isValid());
+ for (int i = Record.CRC_OFFSET + Record.CRC_LENGTH; i < record.size(); i++) {
+ Record copy = copyOf(record);
+ copy.buffer().put(i, (byte) 69);
+ assertFalse(copy.isValid());
+ try {
+ copy.ensureValid();
+ fail("Should fail the above test.");
+ } catch (InvalidRecordException e) {
+ // this is good
+ }
+ }
+ }
+
+ private Record copyOf(Record record) {
+ ByteBuffer buffer = ByteBuffer.allocate(record.size());
+ record.buffer().put(buffer);
+ buffer.rewind();
+ record.buffer().rewind();
+ return new Record(buffer);
+ }
+
+ @Test
+ public void testEquality() {
+ assertEquals(record, copyOf(record));
+ }
+
+ @Parameters
+ public static Collection<Object[]> data() {
+ List<Object[]> values = new ArrayList<Object[]>();
+ for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes()))
+ for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes()))
+ for (CompressionType compression : CompressionType.values())
+ values.add(new Object[] { key, value, compression });
+ return values;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java b/clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java
new file mode 100644
index 0000000..7662d38
--- /dev/null
+++ b/clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java
@@ -0,0 +1,54 @@
+package kafka.common.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.junit.Test;
+
+public class AbstractIteratorTest {
+
+ @Test
+ public void testIterator() {
+ int max = 10;
+ List<Integer> l = new ArrayList<Integer>();
+ for (int i = 0; i < max; i++)
+ l.add(i);
+ ListIterator<Integer> iter = new ListIterator<Integer>(l);
+ for (int i = 0; i < max; i++) {
+ Integer value = i;
+ assertEquals(value, iter.peek());
+ assertTrue(iter.hasNext());
+ assertEquals(value, iter.next());
+ }
+ assertFalse(iter.hasNext());
+ }
+
+ @Test(expected = NoSuchElementException.class)
+ public void testEmptyIterator() {
+ Iterator<Object> iter = new ListIterator<Object>(Arrays.asList());
+ iter.next();
+ }
+
+ class ListIterator<T> extends AbstractIterator<T> {
+ private List<T> list;
+ private int position = 0;
+
+ public ListIterator(List<T> l) {
+ this.list = l;
+ }
+
+ public T makeNext() {
+ if (position < list.size())
+ return list.get(position++);
+ else
+ return allDone();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/common/utils/MockTime.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/utils/MockTime.java b/clients/src/test/java/kafka/common/utils/MockTime.java
new file mode 100644
index 0000000..095d4f6
--- /dev/null
+++ b/clients/src/test/java/kafka/common/utils/MockTime.java
@@ -0,0 +1,28 @@
+package kafka.common.utils;
+
+import java.util.concurrent.TimeUnit;
+
+public class MockTime implements Time {
+
+ private long nanos = 0;
+
+ public MockTime() {
+ this.nanos = System.nanoTime();
+ }
+
+ @Override
+ public long milliseconds() {
+ return TimeUnit.MILLISECONDS.convert(this.nanos, TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public long nanoseconds() {
+ return nanos;
+ }
+
+ @Override
+ public void sleep(long ms) {
+ this.nanos += TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/test/MetricsBench.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/test/MetricsBench.java b/clients/src/test/java/kafka/test/MetricsBench.java
new file mode 100644
index 0000000..2b164bd
--- /dev/null
+++ b/clients/src/test/java/kafka/test/MetricsBench.java
@@ -0,0 +1,38 @@
+package kafka.test;
+
+import java.util.Arrays;
+
+import kafka.common.metrics.Metrics;
+import kafka.common.metrics.Sensor;
+import kafka.common.metrics.stats.Avg;
+import kafka.common.metrics.stats.Count;
+import kafka.common.metrics.stats.Max;
+import kafka.common.metrics.stats.Percentile;
+import kafka.common.metrics.stats.Percentiles;
+import kafka.common.metrics.stats.Percentiles.BucketSizing;
+
+public class MetricsBench {
+
+ public static void main(String[] args) {
+ long iters = Long.parseLong(args[0]);
+ Metrics metrics = new Metrics();
+ Sensor parent = metrics.sensor("parent");
+ Sensor child = metrics.sensor("child", parent);
+ for (Sensor sensor : Arrays.asList(parent, child)) {
+ sensor.add(sensor.name() + ".avg", new Avg());
+ sensor.add(sensor.name() + ".count", new Count());
+ sensor.add(sensor.name() + ".max", new Max());
+ sensor.add(new Percentiles(1024,
+ 0.0,
+ iters,
+ BucketSizing.CONSTANT,
+ new Percentile(sensor.name() + ".median", 50.0),
+ new Percentile(sensor.name() + ".p_99", 99.0)));
+ }
+ long start = System.nanoTime();
+ for (int i = 0; i < iters; i++)
+ child.record(i);
+ double ellapsed = (System.nanoTime() - start) / (double) iters;
+ System.out.println(String.format("%.2f ns per metric recording.", ellapsed));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/test/Microbenchmarks.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/test/Microbenchmarks.java b/clients/src/test/java/kafka/test/Microbenchmarks.java
new file mode 100644
index 0000000..a0ddecb
--- /dev/null
+++ b/clients/src/test/java/kafka/test/Microbenchmarks.java
@@ -0,0 +1,143 @@
+package kafka.test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import kafka.common.utils.CopyOnWriteMap;
+import kafka.common.utils.SystemTime;
+
+public class Microbenchmarks {
+
+ public static void main(String[] args) throws Exception {
+
+ final int iters = Integer.parseInt(args[0]);
+ double x = 0.0;
+ long start = System.nanoTime();
+ for (int i = 0; i < iters; i++)
+ x += Math.sqrt(x);
+ System.out.println(x);
+ System.out.println("sqrt: " + (System.nanoTime() - start) / (double) iters);
+
+ // test clocks
+ systemMillis(iters);
+ systemNanos(iters);
+ long total = 0;
+ start = System.nanoTime();
+ total += systemMillis(iters);
+ System.out.println("System.currentTimeMillis(): " + (System.nanoTime() - start) / iters);
+ start = System.nanoTime();
+ total += systemNanos(iters);
+ System.out.println("System.nanoTime(): " + (System.nanoTime() - start) / iters);
+ System.out.println(total);
+
+ // test random
+ int n = 0;
+ Random random = new Random();
+ start = System.nanoTime();
+ for (int i = 0; i < iters; i++) {
+ n += random.nextInt();
+ }
+ System.out.println(n);
+ System.out.println("random: " + (System.nanoTime() - start) / iters);
+
+ float[] floats = new float[1024];
+ for (int i = 0; i < floats.length; i++)
+ floats[i] = random.nextFloat();
+ Arrays.sort(floats);
+
+ int loc = 0;
+ start = System.nanoTime();
+ for (int i = 0; i < iters; i++)
+ loc += Arrays.binarySearch(floats, floats[i % floats.length]);
+ System.out.println(loc);
+ System.out.println("binary search: " + (System.nanoTime() - start) / iters);
+
+ final SystemTime time = new SystemTime();
+ final AtomicBoolean done = new AtomicBoolean(false);
+ final Object lock = new Object();
+ Thread t1 = new Thread() {
+ public void run() {
+ time.sleep(1);
+ int counter = 0;
+ long start = time.nanoseconds();
+ for (int i = 0; i < iters; i++) {
+ synchronized (lock) {
+ counter++;
+ }
+ }
+ System.out.println("synchronized: " + ((System.nanoTime() - start) / iters));
+ System.out.println(counter);
+ done.set(true);
+ }
+ };
+
+ Thread t2 = new Thread() {
+ public void run() {
+ int counter = 0;
+ while (!done.get()) {
+ time.sleep(1);
+ synchronized (lock) {
+ counter += 1;
+ }
+ }
+ System.out.println("Counter: " + counter);
+ }
+ };
+
+ t1.start();
+ t2.start();
+ t1.join();
+ t2.join();
+
+ Map<String, Integer> values = new HashMap<String, Integer>();
+ for (int i = 0; i < 100; i++)
+ values.put(Integer.toString(i), i);
+ System.out.println("HashMap:");
+ benchMap(2, 1000000, values);
+ System.out.println("ConcurentHashMap:");
+ benchMap(2, 1000000, new ConcurrentHashMap<String, Integer>(values));
+ System.out.println("CopyOnWriteMap:");
+ benchMap(2, 1000000, new CopyOnWriteMap<String, Integer>(values));
+ }
+
+ private static void benchMap(int numThreads, final int iters, final Map<String, Integer> map) throws Exception {
+ final List<String> keys = new ArrayList<String>(map.keySet());
+ final List<Thread> threads = new ArrayList<Thread>();
+ for (int i = 0; i < numThreads; i++) {
+ threads.add(new Thread() {
+ public void run() {
+ int sum = 0;
+ long start = System.nanoTime();
+ for (int j = 0; j < iters; j++)
+ map.get(keys.get(j % threads.size()));
+ System.out.println("Map access time: " + ((System.nanoTime() - start) / (double) iters));
+ }
+ });
+ }
+ for (Thread thread : threads)
+ thread.start();
+ for (Thread thread : threads)
+ thread.join();
+ }
+
+ private static long systemMillis(int iters) {
+ long total = 0;
+ for (int i = 0; i < iters; i++)
+ total += System.currentTimeMillis();
+ return total;
+ }
+
+ private static long systemNanos(int iters) {
+ long total = 0;
+ for (int i = 0; i < iters; i++)
+ total += System.currentTimeMillis();
+ return total;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/test/java/kafka/test/MockSelector.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/test/MockSelector.java b/clients/src/test/java/kafka/test/MockSelector.java
new file mode 100644
index 0000000..15508f4
--- /dev/null
+++ b/clients/src/test/java/kafka/test/MockSelector.java
@@ -0,0 +1,87 @@
+package kafka.test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import kafka.common.network.NetworkReceive;
+import kafka.common.network.NetworkSend;
+import kafka.common.network.Selectable;
+import kafka.common.utils.Time;
+
+/**
+ * A fake selector to use for testing
+ */
+public class MockSelector implements Selectable {
+
+ private final Time time;
+ private final List<NetworkSend> completedSends = new ArrayList<NetworkSend>();
+ private final List<NetworkReceive> completedReceives = new ArrayList<NetworkReceive>();
+ private final List<Integer> disconnected = new ArrayList<Integer>();
+ private final List<Integer> connected = new ArrayList<Integer>();
+
+ public MockSelector(Time time) {
+ this.time = time;
+ }
+
+ @Override
+ public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
+ this.connected.add(id);
+ }
+
+ @Override
+ public void disconnect(int id) {
+ this.disconnected.add(id);
+ }
+
+ @Override
+ public void wakeup() {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ public void clear() {
+ this.completedSends.clear();
+ this.completedReceives.clear();
+ this.disconnected.clear();
+ this.connected.clear();
+ }
+
+ @Override
+ public void poll(long timeout, List<NetworkSend> sends) throws IOException {
+ this.completedSends.addAll(sends);
+ time.sleep(timeout);
+ }
+
+ @Override
+ public List<NetworkSend> completedSends() {
+ return completedSends;
+ }
+
+ public void completeSend(NetworkSend send) {
+ this.completedSends.add(send);
+ }
+
+ @Override
+ public List<NetworkReceive> completedReceives() {
+ return completedReceives;
+ }
+
+ public void completeReceive(NetworkReceive receive) {
+ this.completedReceives.add(receive);
+ }
+
+ @Override
+ public List<Integer> disconnected() {
+ return disconnected;
+ }
+
+ @Override
+ public List<Integer> connected() {
+ return connected;
+ }
+
+}