You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/01/28 20:16:32 UTC

[6/7] KAFKA-1227 New producer!

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/kafka/clients/producer/internals/Sender.java
new file mode 100644
index 0000000..effeb9c
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/internals/Sender.java
@@ -0,0 +1,503 @@
+package kafka.clients.producer.internals;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import kafka.common.Cluster;
+import kafka.common.Node;
+import kafka.common.TopicPartition;
+import kafka.common.errors.NetworkException;
+import kafka.common.network.NetworkReceive;
+import kafka.common.network.NetworkSend;
+import kafka.common.network.Selectable;
+import kafka.common.protocol.ApiKeys;
+import kafka.common.protocol.Errors;
+import kafka.common.protocol.ProtoUtils;
+import kafka.common.protocol.types.Struct;
+import kafka.common.requests.RequestHeader;
+import kafka.common.requests.RequestSend;
+import kafka.common.requests.ResponseHeader;
+import kafka.common.utils.Time;
+
+/**
+ * The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
+ * requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.
+ */
+public class Sender implements Runnable {
+
+    private final Map<Integer, NodeState> nodeState;
+    private final RecordAccumulator accumulator;
+    private final Selectable selector;
+    private final String clientId;
+    private final int maxRequestSize;
+    private final long reconnectBackoffMs;
+    private final short acks;
+    private final int requestTimeout;
+    private final InFlightRequests inFlightRequests;
+    private final Metadata metadata;
+    private final Time time;
+    private int correlation;
+    private boolean metadataFetchInProgress;
+    private volatile boolean running;
+
+    public Sender(Selectable selector,
+                  Metadata metadata,
+                  RecordAccumulator accumulator,
+                  String clientId,
+                  int maxRequestSize,
+                  long reconnectBackoffMs,
+                  short acks,
+                  int requestTimeout,
+                  Time time) {
+        this.nodeState = new HashMap<Integer, NodeState>();
+        this.accumulator = accumulator;
+        this.selector = selector;
+        this.maxRequestSize = maxRequestSize;
+        this.reconnectBackoffMs = reconnectBackoffMs;
+        this.metadata = metadata;
+        this.clientId = clientId;
+        this.running = true;
+        this.requestTimeout = requestTimeout;
+        this.acks = acks;
+        this.inFlightRequests = new InFlightRequests();
+        this.correlation = 0;
+        this.metadataFetchInProgress = false;
+        this.time = time;
+    }
+
+    /**
+     * The main run loop for the sender thread
+     */
+    public void run() {
+        // main loop, runs until close is called
+        while (running) {
+            try {
+                run(time.milliseconds());
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+        // send anything left in the accumulator
+        int unsent = 0;
+        do {
+            try {
+                unsent = run(time.milliseconds());
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        } while (unsent > 0);
+
+        // close all the connections
+        this.selector.close();
+    }
+
+    /**
+     * Run a single iteration of sending
+     * 
+     * @param now The current time
+     * @return The total number of topic/partitions that had data ready (regardless of what we actually sent)
+     */
+    public int run(long now) {
+        Cluster cluster = metadata.fetch();
+        // get the list of partitions with data ready to send
+        List<TopicPartition> ready = this.accumulator.ready(now);
+
+        // prune the list of ready topics to eliminate any that we aren't ready to send yet
+        List<TopicPartition> sendable = processReadyPartitions(cluster, ready, now);
+
+        // should we update our metadata?
+        List<NetworkSend> sends = new ArrayList<NetworkSend>(sendable.size());
+        InFlightRequest metadataReq = maybeMetadataRequest(cluster, now);
+        if (metadataReq != null) {
+            sends.add(metadataReq.request);
+            this.inFlightRequests.add(metadataReq);
+        }
+
+        // create produce requests
+        List<RecordBatch> batches = this.accumulator.drain(sendable, this.maxRequestSize);
+        List<InFlightRequest> requests = collate(cluster, batches);
+        for (int i = 0; i < requests.size(); i++) {
+            InFlightRequest request = requests.get(i);
+            this.inFlightRequests.add(request);
+            sends.add(request.request);
+        }
+
+        // do the I/O
+        try {
+            this.selector.poll(5L, sends);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        // handle responses, connections, and disconnections
+        handleSends(this.selector.completedSends());
+        handleResponses(this.selector.completedReceives(), now);
+        handleDisconnects(this.selector.disconnected());
+        handleConnects(this.selector.connected());
+
+        return ready.size();
+    }
+
+    private InFlightRequest maybeMetadataRequest(Cluster cluster, long now) {
+        if (this.metadataFetchInProgress || !metadata.needsUpdate(now))
+            return null;
+        Node node = cluster.nextNode();
+        NodeState state = nodeState.get(node.id());
+        if (state == null || (state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs)) {
+            // we don't have a connection to this node right now, make one
+            initiateConnect(node, now);
+            return null;
+        } else if (state.state == ConnectionState.CONNECTED) {
+            this.metadataFetchInProgress = true;
+            return metadataRequest(node.id(), metadata.topics());
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * Start closing the sender (won't actually complete until all data is sent out)
+     */
+    public void initiateClose() {
+        this.running = false;
+        this.accumulator.close();
+    }
+
+    /**
+     * Process the set of topic-partitions with data ready to send. If we have a connection to the appropriate node, add
+     * it to the returned set. For any partitions we have no connection to either make one, fetch the appropriate
+     * metdata to be able to do so
+     */
+    private List<TopicPartition> processReadyPartitions(Cluster cluster, List<TopicPartition> ready, long now) {
+        List<TopicPartition> sendable = new ArrayList<TopicPartition>(ready.size());
+        for (TopicPartition tp : ready) {
+            Node node = cluster.leaderFor(tp);
+            if (node == null) {
+                // we don't know about this topic/partition or it has no leader, re-fetch metadata
+                metadata.forceUpdate();
+            } else {
+                NodeState state = nodeState.get(node.id());
+                // TODO: encapsulate this logic somehow
+                if (state == null || (state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs)) {
+                    // we don't have a connection to this node right now, make one
+                    initiateConnect(node, now);
+                } else if (state.state == ConnectionState.CONNECTED && inFlightRequests.canSendMore(node.id())) {
+                    sendable.add(tp);
+                }
+            }
+        }
+        return sendable;
+    }
+
+    /**
+     * Initiate a connection to the given node
+     */
+    private void initiateConnect(Node node, long now) {
+        try {
+            selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 64 * 1024 * 1024, 64 * 1024 * 1024); // TODO
+                                                                                                                              // socket
+                                                                                                                              // buffers
+            nodeState.put(node.id(), new NodeState(ConnectionState.CONNECTING, now));
+        } catch (IOException e) {
+            /* attempt failed, we'll try again after the backoff */
+            nodeState.put(node.id(), new NodeState(ConnectionState.DISCONNECTED, now));
+            /* maybe the problem is our metadata, update it */
+            metadata.forceUpdate();
+        }
+    }
+
+    /**
+     * Handle any closed connections
+     */
+    private void handleDisconnects(List<Integer> disconnects) {
+        for (int node : disconnects) {
+            for (InFlightRequest request : this.inFlightRequests.clearAll(node)) {
+                if (request.batches != null) {
+                    for (RecordBatch batch : request.batches.values())
+                        batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response."));
+                    this.accumulator.deallocate(request.batches.values());
+                }
+                NodeState state = this.nodeState.get(request.request.destination());
+                if (state != null)
+                    state.state = ConnectionState.DISCONNECTED;
+            }
+        }
+    }
+
+    /**
+     * Record any connections that completed in our node state
+     */
+    private void handleConnects(List<Integer> connects) {
+        for (Integer id : connects)
+            this.nodeState.get(id).state = ConnectionState.CONNECTED;
+    }
+
+    /**
+     * Process completed sends
+     */
+    public void handleSends(List<NetworkSend> sends) {
+        /* if acks = 0 then the request is satisfied once sent */
+        for (NetworkSend send : sends) {
+            Deque<InFlightRequest> requests = this.inFlightRequests.requestQueue(send.destination());
+            InFlightRequest request = requests.peekFirst();
+            if (!request.expectResponse) {
+                requests.pollFirst();
+                if (request.request.header().apiKey() == ApiKeys.PRODUCE.id) {
+                    for (RecordBatch batch : request.batches.values())
+                        batch.done(-1L, Errors.NONE.exception());
+                    this.accumulator.deallocate(request.batches.values());
+                }
+            }
+        }
+    }
+
+    /**
+     * Handle responses from the server
+     */
+    private void handleResponses(List<NetworkReceive> receives, long now) {
+        for (NetworkReceive receive : receives) {
+            int source = receive.source();
+            InFlightRequest req = inFlightRequests.nextCompleted(source);
+            ResponseHeader header = ResponseHeader.parse(receive.payload());
+            short apiKey = req.request.header().apiKey();
+            Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
+            correlate(req.request.header(), header);
+            if (req.request.header().apiKey() == ApiKeys.PRODUCE.id)
+                handleProduceResponse(req, body);
+            else if (req.request.header().apiKey() == ApiKeys.METADATA.id)
+                handleMetadataResponse(body, now);
+            else
+                throw new IllegalStateException("Unexpected response type: " + req.request.header().apiKey());
+        }
+    }
+
+    private void handleMetadataResponse(Struct body, long now) {
+        this.metadataFetchInProgress = false;
+        this.metadata.update(ProtoUtils.parseMetadataResponse(body), now);
+    }
+
+    /**
+     * Handle a produce response
+     */
+    private void handleProduceResponse(InFlightRequest request, Struct response) {
+        for (Object topicResponse : (Object[]) response.get("responses")) {
+            Struct topicRespStruct = (Struct) topicResponse;
+            String topic = (String) topicRespStruct.get("topic");
+            for (Object partResponse : (Object[]) topicRespStruct.get("partition_responses")) {
+                Struct partRespStruct = (Struct) partResponse;
+                int partition = (Integer) partRespStruct.get("partition");
+                short errorCode = (Short) partRespStruct.get("error_code");
+                long offset = (Long) partRespStruct.get("base_offset");
+                RecordBatch batch = request.batches.get(new TopicPartition(topic, partition));
+                batch.done(offset, Errors.forCode(errorCode).exception());
+            }
+        }
+        this.accumulator.deallocate(request.batches.values());
+    }
+
+    /**
+     * Validate that the response corresponds to the request we expect or else explode
+     */
+    private void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
+        if (requestHeader.correlationId() != responseHeader.correlationId())
+            throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId()
+                                            + ") does not match request ("
+                                            + requestHeader.correlationId()
+                                            + ")");
+    }
+
+    /**
+     * Create a metadata request for the given topics
+     */
+    private InFlightRequest metadataRequest(int node, Set<String> topics) {
+        String[] ts = new String[topics.size()];
+        topics.toArray(ts);
+        Struct body = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id));
+        body.set("topics", topics.toArray());
+        RequestSend send = new RequestSend(node, new RequestHeader(ApiKeys.METADATA.id, clientId, correlation++), body);
+        return new InFlightRequest(true, send, null);
+    }
+
+    /**
+     * Collate the record batches into a list of produce requests on a per-node basis
+     */
+    private List<InFlightRequest> collate(Cluster cluster, List<RecordBatch> batches) {
+        Map<Integer, List<RecordBatch>> collated = new HashMap<Integer, List<RecordBatch>>();
+        for (RecordBatch batch : batches) {
+            Node node = cluster.leaderFor(batch.topicPartition);
+            List<RecordBatch> found = collated.get(node.id());
+            if (found == null) {
+                found = new ArrayList<RecordBatch>();
+                collated.put(node.id(), found);
+            }
+            found.add(batch);
+        }
+        List<InFlightRequest> requests = new ArrayList<InFlightRequest>(collated.size());
+        for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
+            requests.add(produceRequest(entry.getKey(), acks, requestTimeout, entry.getValue()));
+        return requests;
+    }
+
+    /**
+     * Create a produce request from the given record batches
+     */
+    private InFlightRequest produceRequest(int destination, short acks, int timeout, List<RecordBatch> batches) {
+        Map<TopicPartition, RecordBatch> batchesByPartition = new HashMap<TopicPartition, RecordBatch>();
+        Map<String, List<RecordBatch>> batchesByTopic = new HashMap<String, List<RecordBatch>>();
+        for (RecordBatch batch : batches) {
+            batchesByPartition.put(batch.topicPartition, batch);
+            List<RecordBatch> found = batchesByTopic.get(batch.topicPartition.topic());
+            if (found == null) {
+                found = new ArrayList<RecordBatch>();
+                batchesByTopic.put(batch.topicPartition.topic(), found);
+            }
+            found.add(batch);
+        }
+        Struct produce = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id));
+        produce.set("acks", acks);
+        produce.set("timeout", timeout);
+        List<Struct> topicDatas = new ArrayList<Struct>(batchesByTopic.size());
+        for (Map.Entry<String, List<RecordBatch>> entry : batchesByTopic.entrySet()) {
+            Struct topicData = produce.instance("topic_data");
+            topicData.set("topic", entry.getKey());
+            List<RecordBatch> parts = entry.getValue();
+            Object[] partitionData = new Object[parts.size()];
+            for (int i = 0; i < parts.size(); i++) {
+                ByteBuffer buffer = parts.get(i).records.buffer();
+                buffer.flip();
+                Struct part = topicData.instance("data")
+                                       .set("partition", parts.get(i).topicPartition.partition())
+                                       .set("message_set", buffer);
+                partitionData[i] = part;
+            }
+            topicData.set("data", partitionData);
+            topicDatas.add(topicData);
+        }
+        produce.set("topic_data", topicDatas.toArray());
+
+        RequestHeader header = new RequestHeader(ApiKeys.PRODUCE.id, clientId, correlation++);
+        RequestSend send = new RequestSend(destination, header, produce);
+        return new InFlightRequest(acks != 0, send, batchesByPartition);
+    }
+
+    /**
+     * Wake up the selector associated with this send thread
+     */
+    public void wakeup() {
+        this.selector.wakeup();
+    }
+
+    /**
+     * The states of a node connection
+     */
+    private static enum ConnectionState {
+        DISCONNECTED, CONNECTING, CONNECTED
+    }
+
+    /**
+     * The state of a node
+     */
+    private static final class NodeState {
+        private ConnectionState state;
+        private long lastConnectAttempt;
+
+        public NodeState(ConnectionState state, long lastConnectAttempt) {
+            this.state = state;
+            this.lastConnectAttempt = lastConnectAttempt;
+        }
+
+        public String toString() {
+            return "NodeState(" + state + ", " + lastConnectAttempt + ")";
+        }
+    }
+
+    /**
+     * An request that hasn't been fully processed yet
+     */
+    private static final class InFlightRequest {
+        public boolean expectResponse;
+        public Map<TopicPartition, RecordBatch> batches;
+        public RequestSend request;
+
+        /**
+         * @param expectResponse Should we expect a response message or is this request complete once it is sent?
+         * @param request The request
+         * @param batches The record batches contained in the request if it is a produce request
+         */
+        public InFlightRequest(boolean expectResponse, RequestSend request, Map<TopicPartition, RecordBatch> batches) {
+            this.batches = batches;
+            this.request = request;
+            this.expectResponse = expectResponse;
+        }
+    }
+
+    /**
+     * A set of outstanding request queues for each node that have not yet received responses
+     */
+    private static final class InFlightRequests {
+        private final Map<Integer, Deque<InFlightRequest>> requests = new HashMap<Integer, Deque<InFlightRequest>>();
+
+        /**
+         * Add the given request to the queue for the node it was directed to
+         */
+        public void add(InFlightRequest request) {
+            Deque<InFlightRequest> reqs = this.requests.get(request.request.destination());
+            if (reqs == null) {
+                reqs = new ArrayDeque<InFlightRequest>();
+                this.requests.put(request.request.destination(), reqs);
+            }
+            reqs.addFirst(request);
+        }
+
+        public Deque<InFlightRequest> requestQueue(int node) {
+            Deque<InFlightRequest> reqs = requests.get(node);
+            if (reqs == null || reqs.isEmpty())
+                throw new IllegalStateException("Response from server for which there are no in-flight requests.");
+            return reqs;
+        }
+
+        /**
+         * Get the oldest request (the one that that will be completed next) for the given node
+         */
+        public InFlightRequest nextCompleted(int node) {
+            return requestQueue(node).pollLast();
+        }
+
+        /**
+         * Can we send more requests to this node?
+         * 
+         * @param node Node in question
+         * @return true iff we have no requests still being sent to the given node
+         */
+        public boolean canSendMore(int node) {
+            Deque<InFlightRequest> queue = requests.get(node);
+            return queue == null || queue.isEmpty() || queue.peekFirst().request.complete();
+        }
+
+        /**
+         * Clear out all the in-flight requests for the given node and return them
+         * 
+         * @param node The node
+         * @return All the in-flight requests for that node that have been removed
+         */
+        public Iterable<InFlightRequest> clearAll(int node) {
+            Deque<InFlightRequest> reqs = requests.get(node);
+            if (reqs == null) {
+                return Collections.emptyList();
+            } else {
+                return requests.remove(node);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/kafka/clients/tools/ProducerPerformance.java
new file mode 100644
index 0000000..7331b73
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/tools/ProducerPerformance.java
@@ -0,0 +1,70 @@
+package kafka.clients.tools;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+import kafka.clients.producer.Callback;
+import kafka.clients.producer.KafkaProducer;
+import kafka.clients.producer.ProducerConfig;
+import kafka.clients.producer.ProducerRecord;
+import kafka.clients.producer.RecordSend;
+import kafka.common.ByteSerialization;
+
+public class ProducerPerformance {
+
+    public static void main(String[] args) throws Exception {
+        if (args.length != 3) {
+            System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url num_messages message_size");
+            System.exit(1);
+        }
+        String url = args[0];
+        int numMessages = Integer.parseInt(args[1]);
+        int messageSize = Integer.parseInt(args[2]);
+        Properties props = new Properties();
+        props.setProperty(ProducerConfig.REQUIRED_ACKS_CONFIG, "1");
+        props.setProperty(ProducerConfig.BROKER_LIST_CONFIG, url);
+        props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5 * 1000));
+        props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE));
+        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteSerialization.class.getName());
+        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteSerialization.class.getName());
+
+        KafkaProducer producer = new KafkaProducer(props);
+        Callback callback = new Callback() {
+            public void onCompletion(RecordSend send) {
+                try {
+                    send.offset();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+        byte[] payload = new byte[messageSize];
+        Arrays.fill(payload, (byte) 1);
+        ProducerRecord record = new ProducerRecord("test", payload);
+        long start = System.currentTimeMillis();
+        long maxLatency = -1L;
+        long totalLatency = 0;
+        int reportingInterval = 1000000;
+        for (int i = 0; i < numMessages; i++) {
+            long sendStart = System.currentTimeMillis();
+            producer.send(record, null);
+            long sendEllapsed = System.currentTimeMillis() - sendStart;
+            maxLatency = Math.max(maxLatency, sendEllapsed);
+            totalLatency += sendEllapsed;
+            if (i % reportingInterval == 0) {
+                System.out.printf("%d  max latency = %d ms, avg latency = %.5f\n",
+                                  i,
+                                  maxLatency,
+                                  (totalLatency / (double) reportingInterval));
+                totalLatency = 0L;
+                maxLatency = -1L;
+            }
+        }
+        long ellapsed = System.currentTimeMillis() - start;
+        double msgsSec = 1000.0 * numMessages / (double) ellapsed;
+        double mbSec = msgsSec * messageSize / (1024.0 * 1024.0);
+        System.out.printf("%d messages sent in %d ms ms. %.2f messages per second (%.2f mb/sec).", numMessages, ellapsed, msgsSec, mbSec);
+        producer.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/ByteSerialization.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/ByteSerialization.java b/clients/src/main/java/kafka/common/ByteSerialization.java
new file mode 100644
index 0000000..eca69f1
--- /dev/null
+++ b/clients/src/main/java/kafka/common/ByteSerialization.java
@@ -0,0 +1,18 @@
+package kafka.common;
+
+/**
+ * A serialization implementation that just retains the provided byte array unchanged
+ */
+public class ByteSerialization implements Serializer, Deserializer {
+
+    @Override
+    public Object fromBytes(byte[] bytes) {
+        return bytes;
+    }
+
+    @Override
+    public byte[] toBytes(Object o) {
+        return (byte[]) o;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/Cluster.java b/clients/src/main/java/kafka/common/Cluster.java
new file mode 100644
index 0000000..d0acd8d
--- /dev/null
+++ b/clients/src/main/java/kafka/common/Cluster.java
@@ -0,0 +1,102 @@
+package kafka.common;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import kafka.common.utils.Utils;
+
+/**
+ * A representation of the nodes, topics, and partitions in the Kafka cluster
+ */
+public final class Cluster {
+
+    private final AtomicInteger counter = new AtomicInteger(0);
+    private final List<Node> nodes;
+    private final Map<Integer, Node> nodesById;
+    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
+    private final Map<String, List<PartitionInfo>> partitionsByTopic;
+
+    /**
+     * Create a new cluster with the given nodes and partitions
+     * @param nodes The nodes in the cluster
+     * @param partitions Information about a subset of the topic-partitions this cluster hosts
+     */
+    public Cluster(Collection<Node> nodes, Collection<PartitionInfo> partitions) {
+        this.nodes = new ArrayList<Node>(nodes);
+        this.nodesById = new HashMap<Integer, Node>(this.nodes.size());
+        this.partitionsByTopicPartition = new HashMap<TopicPartition, PartitionInfo>(partitions.size());
+        this.partitionsByTopic = new HashMap<String, List<PartitionInfo>>(partitions.size());
+
+        Collections.shuffle(this.nodes);
+        for (Node n : this.nodes)
+            this.nodesById.put(n.id(), n);
+        for (PartitionInfo p : partitions)
+            this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
+        for (PartitionInfo p : partitions) {
+            if (!this.partitionsByTopic.containsKey(p.topic()))
+                this.partitionsByTopic.put(p.topic(), new ArrayList<PartitionInfo>());
+            List<PartitionInfo> ps = this.partitionsByTopic.get(p.topic());
+            ps.add(p);
+        }
+    }
+
+    /**
+     * Create an empty cluster instance with no nodes and no topic-partitions.
+     */
+    public static Cluster empty() {
+        return new Cluster(new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0));
+    }
+
+    /**
+     * Create a "bootstrap" cluster using the given list of host/ports
+     * @param addresses The addresses
+     * @return A cluster for these hosts/ports
+     */
+    public static Cluster bootstrap(List<InetSocketAddress> addresses) {
+        List<Node> nodes = new ArrayList<Node>();
+        int nodeId = Integer.MIN_VALUE;
+        for (InetSocketAddress address : addresses)
+            nodes.add(new Node(nodeId++, address.getHostName(), address.getPort()));
+        return new Cluster(nodes, new ArrayList<PartitionInfo>(0));
+    }
+
+    /**
+     * Get the current leader for the given topic-partition
+     * @param topicPartition The topic and partition we want to know the leader for
+     * @return The node that is the leader for this topic-partition, or null if there is currently no leader
+     */
+    public Node leaderFor(TopicPartition topicPartition) {
+        PartitionInfo info = partitionsByTopicPartition.get(topicPartition);
+        if (info == null)
+            return null;
+        else
+            return nodesById.get(info.leader());
+    }
+
+    /**
+     * Get the list of partitions for this topic
+     * @param topic The topic name
+     * @return A list of partitions
+     */
+    public List<PartitionInfo> partitionsFor(String topic) {
+        return this.partitionsByTopic.get(topic);
+    }
+
+    /**
+     * Round-robin over the nodes in this cluster
+     */
+    public Node nextNode() {
+        int size = nodes.size();
+        if (size == 0)
+            throw new IllegalStateException("No known nodes.");
+        int idx = Utils.abs(counter.getAndIncrement()) % size;
+        return this.nodes.get(idx);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/Configurable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/Configurable.java b/clients/src/main/java/kafka/common/Configurable.java
new file mode 100644
index 0000000..1af9dd4
--- /dev/null
+++ b/clients/src/main/java/kafka/common/Configurable.java
@@ -0,0 +1,15 @@
+package kafka.common;
+
+import java.util.Map;
+
+/**
+ * A Mix-in style interface for classes that are instantiated by reflection and need to take configuration parameters
+ */
+public interface Configurable {
+
+    /**
+     * Configure this class with the given key-value pairs
+     */
+    public void configure(Map<String, ?> configs);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/Deserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/Deserializer.java b/clients/src/main/java/kafka/common/Deserializer.java
new file mode 100644
index 0000000..ad2e784
--- /dev/null
+++ b/clients/src/main/java/kafka/common/Deserializer.java
@@ -0,0 +1,18 @@
+package kafka.common;
+
+/**
+ * A class that controls how an object is turned into bytes. Classes implementing this interface will generally be
+ * instantiated by the framework.
+ * <p>
+ * An implementation that requires special configuration parameters can implement {@link Configurable}
+ */
+public interface Deserializer {
+
+    /**
+     * Map a byte[] to an object
+     * @param bytes The bytes for the object (can be null)
+     * @return The deserialized object (can return null)
+     */
+    public Object fromBytes(byte[] bytes);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/KafkaException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/KafkaException.java b/clients/src/main/java/kafka/common/KafkaException.java
new file mode 100644
index 0000000..7182cac
--- /dev/null
+++ b/clients/src/main/java/kafka/common/KafkaException.java
@@ -0,0 +1,26 @@
+package kafka.common;
+
+/**
+ * The base class of all other Kafka exceptions
+ */
+public class KafkaException extends RuntimeException {
+
+    private final static long serialVersionUID = 1L;
+
+    public KafkaException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public KafkaException(String message) {
+        super(message);
+    }
+
+    public KafkaException(Throwable cause) {
+        super(cause);
+    }
+
+    public KafkaException() {
+        super();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/Metric.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/Metric.java b/clients/src/main/java/kafka/common/Metric.java
new file mode 100644
index 0000000..c29e331
--- /dev/null
+++ b/clients/src/main/java/kafka/common/Metric.java
@@ -0,0 +1,23 @@
+package kafka.common;
+
+/**
+ * A numerical metric tracked for monitoring purposes
+ */
+public interface Metric {
+
+    /**
+     * A unique name for this metric
+     */
+    public String name();
+
+    /**
+     * A description of what is measured...this will be "" if no description was given
+     */
+    public String description();
+
+    /**
+     * The value of the metric
+     */
+    public double value();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/Node.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/Node.java b/clients/src/main/java/kafka/common/Node.java
new file mode 100644
index 0000000..81fc907
--- /dev/null
+++ b/clients/src/main/java/kafka/common/Node.java
@@ -0,0 +1,76 @@
+package kafka.common;
+
+/**
+ * Information about a Kafka node
+ */
+public class Node {
+
+    private final int id;
+    private final String host;
+    private final int port;
+
+    public Node(int id, String host, int port) {
+        super();
+        this.id = id;
+        this.host = host;
+        this.port = port;
+    }
+
+    /**
+     * The node id of this node
+     */
+    public int id() {
+        return id;
+    }
+
+    /**
+     * The host name for this node
+     */
+    public String host() {
+        return host;
+    }
+
+    /**
+     * The port for this node
+     */
+    public int port() {
+        return port;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((host == null) ? 0 : host.hashCode());
+        result = prime * result + id;
+        result = prime * result + port;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        Node other = (Node) obj;
+        if (host == null) {
+            if (other.host != null)
+                return false;
+        } else if (!host.equals(other.host))
+            return false;
+        if (id != other.id)
+            return false;
+        if (port != other.port)
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "Node(" + id + ", " + host + ", " + port + ")";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/PartitionInfo.java b/clients/src/main/java/kafka/common/PartitionInfo.java
new file mode 100644
index 0000000..f3f08dd
--- /dev/null
+++ b/clients/src/main/java/kafka/common/PartitionInfo.java
@@ -0,0 +1,58 @@
+package kafka.common;
+
+/**
+ * Information about a topic-partition.
+ */
+public class PartitionInfo {
+
+    private final String topic;
+    private final int partition;
+    private final int leader;
+    private final int[] replicas;
+    private final int[] inSyncReplicas;
+
+    public PartitionInfo(String topic, int partition, int leader, int[] replicas, int[] inSyncReplicas) {
+        this.topic = topic;
+        this.partition = partition;
+        this.leader = leader;
+        this.replicas = replicas;
+        this.inSyncReplicas = inSyncReplicas;
+    }
+
+    /**
+     * The topic name
+     */
+    public String topic() {
+        return topic;
+    }
+
+    /**
+     * The partition id
+     */
+    public int partition() {
+        return partition;
+    }
+
+    /**
+     * The node id of the node currently acting as a leader for this partition or -1 if there is no leader
+     */
+    public int leader() {
+        return leader;
+    }
+
+    /**
+     * The complete set of replicas for this partition regardless of whether they are alive or up-to-date
+     */
+    public int[] replicas() {
+        return replicas;
+    }
+
+    /**
+     * The subset of the replicas that are in sync, that is caught-up to the leader and ready to take over as leader if
+     * the leader should fail
+     */
+    public int[] inSyncReplicas() {
+        return inSyncReplicas;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/Serializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/Serializer.java b/clients/src/main/java/kafka/common/Serializer.java
new file mode 100644
index 0000000..63353d8
--- /dev/null
+++ b/clients/src/main/java/kafka/common/Serializer.java
@@ -0,0 +1,21 @@
+package kafka.common;
+
+/**
+ * A class that controls how an object is turned into bytes. Classes implementing this interface will generally be
+ * instantiated by the framework.
+ * <p>
+ * An implementation should handle null inputs.
+ * <p>
+ * An implementation that requires special configuration parameters can implement {@link Configurable}
+ */
+public interface Serializer {
+
+    /**
+     * Translate an object into bytes. The serializer must handle null inputs, and will generally just return null in
+     * this case.
+     * @param o The object to serialize, can be null
+     * @return The serialized bytes for the object or null
+     */
+    public byte[] toBytes(Object o);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/StringSerialization.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/StringSerialization.java b/clients/src/main/java/kafka/common/StringSerialization.java
new file mode 100644
index 0000000..c0ed5ca
--- /dev/null
+++ b/clients/src/main/java/kafka/common/StringSerialization.java
@@ -0,0 +1,58 @@
+package kafka.common;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+
+/**
+ * A serializer and deserializer for strings.
+ * <p>
+ * This class accepts a configuration parameter string.encoding which can take the string name of any supported
+ * encoding. If no encoding is specified the default will be UTF-8.
+ */
+public class StringSerialization implements Serializer, Deserializer, Configurable {
+
+    private final static String ENCODING_CONFIG = "string.encoding";
+
+    private String encoding;
+
+    public StringSerialization(String encoding) {
+        super();
+        this.encoding = encoding;
+    }
+
+    public StringSerialization() {
+        this("UTF8");
+    }
+
+    public void configure(Map<String, ?> configs) {
+        if (configs.containsKey(ENCODING_CONFIG))
+            this.encoding = (String) configs.get(ENCODING_CONFIG);
+    }
+
+    @Override
+    public Object fromBytes(byte[] bytes) {
+        if (bytes == null) {
+            return null;
+        } else {
+            try {
+                return new String(bytes, encoding);
+            } catch (UnsupportedEncodingException e) {
+                throw new KafkaException(e);
+            }
+        }
+    }
+
+    @Override
+    public byte[] toBytes(Object o) {
+        if (o == null) {
+            return null;
+        } else {
+            try {
+                return ((String) o).getBytes(encoding);
+            } catch (UnsupportedEncodingException e) {
+                throw new KafkaException(e);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/TopicPartition.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/TopicPartition.java b/clients/src/main/java/kafka/common/TopicPartition.java
new file mode 100644
index 0000000..e7be96c
--- /dev/null
+++ b/clients/src/main/java/kafka/common/TopicPartition.java
@@ -0,0 +1,61 @@
+package kafka.common;
+
+/**
+ * A topic name and partition number
+ */
+public final class TopicPartition {
+
+    private int hash = 0;
+    private final int partition;
+    private final String topic;
+
+    public TopicPartition(String topic, int partition) {
+        this.partition = partition;
+        this.topic = topic;
+    }
+
+    public int partition() {
+        return partition;
+    }
+
+    public String topic() {
+        return topic;
+    }
+
+    @Override
+    public int hashCode() {
+        if (hash != 0)
+            return hash;
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + partition;
+        result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+        this.hash = result;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        TopicPartition other = (TopicPartition) obj;
+        if (partition != other.partition)
+            return false;
+        if (topic == null) {
+            if (other.topic != null)
+                return false;
+        } else if (!topic.equals(other.topic))
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return topic + "-" + partition;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/config/AbstractConfig.java b/clients/src/main/java/kafka/common/config/AbstractConfig.java
new file mode 100644
index 0000000..5db302d
--- /dev/null
+++ b/clients/src/main/java/kafka/common/config/AbstractConfig.java
@@ -0,0 +1,93 @@
+package kafka.common.config;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import kafka.common.Configurable;
+import kafka.common.KafkaException;
+import kafka.common.utils.Utils;
+
+/**
+ * A convenient base class for configurations to extend.
+ * <p>
+ * This class holds both the original configuration that was provided as well as the parsed
+ */
+public class AbstractConfig {
+
+    private final Set<String> used;
+    private final Map<String, Object> values;
+    private final Map<String, ?> originals;
+
+    @SuppressWarnings("unchecked")
+    public AbstractConfig(ConfigDef definition, Map<?, ?> originals) {
+        /* check that all the keys are really strings */
+        for (Object key : originals.keySet())
+            if (!(key instanceof String))
+                throw new ConfigException(key.toString(), originals.get(key), "Key must be a string.");
+        this.originals = (Map<String, ?>) originals;
+        this.values = definition.parse(this.originals);
+        this.used = Collections.synchronizedSet(new HashSet<String>());
+    }
+
+    protected Object get(String key) {
+        if (!values.containsKey(key))
+            throw new ConfigException(String.format("Unknown configuration '%s'", key));
+        used.add(key);
+        return values.get(key);
+    }
+
+    public int getInt(String key) {
+        return (Integer) get(key);
+    }
+
+    public Long getLong(String key) {
+        return (Long) get(key);
+    }
+
+    @SuppressWarnings("unchecked")
+    public List<String> getList(String key) {
+        return (List<String>) get(key);
+    }
+
+    public boolean getBoolean(String key) {
+        return (Boolean) get(key);
+    }
+
+    public String getString(String key) {
+        return (String) get(key);
+    }
+
+    public Class<?> getClass(String key) {
+        return (Class<?>) get(key);
+    }
+
+    public Set<String> unused() {
+        Set<String> keys = new HashSet<String>(originals.keySet());
+        keys.remove(used);
+        return keys;
+    }
+
+    /**
+     * Get a configured instance of the give class specified by the given configuration key. If the object implements
+     * Configurable configure it using the configuration.
+     * 
+     * @param key The configuration key for the class
+     * @param t The interface the class should implement
+     * @return A configured instance of the class
+     */
+    public <T> T getConfiguredInstance(String key, Class<T> t) {
+        Class<?> c = getClass(key);
+        if (c == null)
+            return null;
+        Object o = Utils.newInstance(c);
+        if (!t.isInstance(o))
+            throw new KafkaException(c.getName() + " is not an instance of " + t.getName());
+        if (o instanceof Configurable)
+            ((Configurable) o).configure(this.originals);
+        return t.cast(o);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/config/ConfigDef.java b/clients/src/main/java/kafka/common/config/ConfigDef.java
new file mode 100644
index 0000000..2507c9c
--- /dev/null
+++ b/clients/src/main/java/kafka/common/config/ConfigDef.java
@@ -0,0 +1,253 @@
+package kafka.common.config;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class is used for specifying the set of expected configurations, their type, their defaults, their
+ * documentation, and any special validation logic used for checking the correctness of the values the user provides.
+ * <p>
+ * Usage of this class looks something like this:
+ * 
+ * <pre>
+ * ConfigDef defs = new ConfigDef();
+ * defs.define(&quot;config_name&quot;, Type.STRING, &quot;default string value&quot;, &quot;This configuration is used for blah blah blah.&quot;);
+ * defs.define(&quot;another_config_name&quot;, Type.INT, 42, Range.atLeast(0), &quot;More documentation on this config&quot;);
+ * 
+ * Properties props = new Properties();
+ * props.setProperty(&quot;config_name&quot;, &quot;some value&quot;);
+ * Map&lt;String, Object&gt; configs = defs.parse(props);
+ * 
+ * String someConfig = (String) configs.get(&quot;config_name&quot;); // will return &quot;some value&quot;
+ * int anotherConfig = (Integer) configs.get(&quot;another_config_name&quot;); // will return default value of 42
+ * </pre>
+ * 
+ * This class can be used stand-alone or in combination with {@link AbstractConfig} which provides some additional
+ * functionality for accessing configs.
+ */
+public class ConfigDef {
+
+    private static final Object NO_DEFAULT_VALUE = new Object();
+
+    private final Map<String, ConfigKey> configKeys = new HashMap<String, ConfigKey>();
+
+    /**
+     * Define a new configuration
+     * @param name The name of the config parameter
+     * @param type The type of the config
+     * @param defaultValue The default value to use if this config isn't present
+     * @param validator A validator to use in checking the correctness of the config
+     * @param documentation The documentation string for the config
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, String documentation) {
+        if (configKeys.containsKey(name))
+            throw new ConfigException("Configuration " + name + " is defined twice.");
+        Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
+        configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, documentation));
+        return this;
+    }
+
+    /**
+     * Define a new configuration with no special validation logic
+     * @param name The name of the config parameter
+     * @param type The type of the config
+     * @param defaultValue The default value to use if this config isn't present
+     * @param documentation The documentation string for the config
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Object defaultValue, String documentation) {
+        return define(name, type, defaultValue, null, documentation);
+    }
+
+    /**
+     * Define a required parameter with no default value
+     * @param name The name of the config parameter
+     * @param type The type of the config
+     * @param validator A validator to use in checking the correctness of the config
+     * @param documentation The documentation string for the config
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Validator validator, String documentation) {
+        return define(name, type, NO_DEFAULT_VALUE, validator, documentation);
+    }
+
+    /**
+     * Define a required parameter with no default value and no special validation logic
+     * @param name The name of the config parameter
+     * @param type The type of the config
+     * @param documentation The documentation string for the config
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, String documentation) {
+        return define(name, type, NO_DEFAULT_VALUE, null, documentation);
+    }
+
+    /**
+     * Parse and validate configs against this configuration definition. The input is a map of configs. It is expected
+     * that the keys of the map are strings, but the values can either be strings or they may already be of the
+     * appropriate type (int, string, etc). This will work equally well with either java.util.Properties instances or a
+     * programmatically constructed map.
+     * @param props The configs to parse and validate
+     * @return Parsed and validated configs. The key will be the config name and the value will be the value parsed into
+     *         the appropriate type (int, string, etc)
+     */
+    public Map<String, Object> parse(Map<?, ?> props) {
+        /* parse all known keys */
+        Map<String, Object> values = new HashMap<String, Object>();
+        for (ConfigKey key : configKeys.values()) {
+            Object value;
+            if (props.containsKey(key.name))
+                value = parseType(key.name, props.get(key.name), key.type);
+            else if (key.defaultValue == NO_DEFAULT_VALUE)
+                throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value.");
+            else
+                value = key.defaultValue;
+            values.put(key.name, value);
+        }
+        return values;
+    }
+
+    /**
+     * Parse a value according to its expected type.
+     * @param name The config name
+     * @param value The config value
+     * @param type The expected type
+     * @return The parsed object
+     */
+    private Object parseType(String name, Object value, Type type) {
+        try {
+            String trimmed = null;
+            if (value instanceof String)
+                trimmed = ((String) value).trim();
+            switch (type) {
+                case BOOLEAN:
+                    if (value instanceof String)
+                        return Boolean.parseBoolean(trimmed);
+                    else if (value instanceof Boolean)
+                        return value;
+                    else
+                        throw new ConfigException(name, value, "Expected value to be either true or false");
+                case STRING:
+                    if (value instanceof String)
+                        return trimmed;
+                    else
+                        throw new ConfigException(name, value, "Expected value to be a string, but it was a " + value.getClass().getName());
+                case INT:
+                    if (value instanceof Integer) {
+                        return (Integer) value;
+                    } else if (value instanceof String) {
+                        return Integer.parseInt(trimmed);
+                    } else {
+                        throw new ConfigException(name, value, "Expected value to be an number.");
+                    }
+                case LONG:
+                    if (value instanceof Integer)
+                        return ((Integer) value).longValue();
+                    if (value instanceof Long)
+                        return (Long) value;
+                    else if (value instanceof String)
+                        return Long.parseLong(trimmed);
+                    else
+                        throw new ConfigException(name, value, "Expected value to be an number.");
+                case DOUBLE:
+                    if (value instanceof Number)
+                        return ((Number) value).doubleValue();
+                    else if (value instanceof String)
+                        return Double.parseDouble(trimmed);
+                    else
+                        throw new ConfigException(name, value, "Expected value to be an number.");
+                case LIST:
+                    if (value instanceof List)
+                        return (List<?>) value;
+                    else if (value instanceof String)
+                        return Arrays.asList(trimmed.split("\\s*,\\s*", -1));
+                    else
+                        throw new ConfigException(name, value, "Expected a comma seperated list.");
+                case CLASS:
+                    if (value instanceof Class)
+                        return (Class<?>) value;
+                    else if (value instanceof String)
+                        return Class.forName(trimmed);
+                    else
+                        throw new ConfigException(name, value, "Expected a Class instance or class name.");
+                default:
+                    throw new IllegalStateException("Unknown type.");
+            }
+        } catch (NumberFormatException e) {
+            throw new ConfigException(name, value, "Not a number of type " + type);
+        } catch (ClassNotFoundException e) {
+            throw new ConfigException(name, value, "Class " + value + " could not be found.");
+        }
+    }
+
+    /**
+     * The config types
+     */
+    public enum Type {
+        BOOLEAN, STRING, INT, LONG, DOUBLE, LIST, CLASS;
+    }
+
+    /**
+     * Validation logic the user may provide
+     */
+    public interface Validator {
+        public void ensureValid(String name, Object o);
+    }
+
+    /**
+     * Validation logic for numeric ranges
+     */
+    public static class Range implements Validator {
+        private final Number min;
+        private final Number max;
+
+        private Range(Number min, Number max) {
+            this.min = min;
+            this.max = max;
+        }
+
+        /**
+         * A numeric range that checks only the lower bound
+         * @param min The minimum acceptable value
+         */
+        public static Range atLeast(Number min) {
+            return new Range(min, Double.MAX_VALUE);
+        }
+
+        /**
+         * A numeric range that checks both the upper and lower bound
+         */
+        public static Range between(Number min, Number max) {
+            return new Range(min, max);
+        }
+
+        public void ensureValid(String name, Object o) {
+            Number n = (Number) o;
+            if (n.doubleValue() < min.doubleValue() || n.doubleValue() > max.doubleValue())
+                throw new ConfigException(name, o, "Value must be in the range [" + min + ", " + max + "]");
+        }
+    }
+
+    private static class ConfigKey {
+        public final String name;
+        public final Type type;
+        public final String documentation;
+        public final Object defaultValue;
+        public final Validator validator;
+
+        public ConfigKey(String name, Type type, Object defaultValue, Validator validator, String documentation) {
+            super();
+            this.name = name;
+            this.type = type;
+            this.defaultValue = defaultValue;
+            this.validator = validator;
+            if (this.validator != null)
+                this.validator.ensureValid(name, defaultValue);
+            this.documentation = documentation;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/config/ConfigException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/config/ConfigException.java b/clients/src/main/java/kafka/common/config/ConfigException.java
new file mode 100644
index 0000000..fad141e
--- /dev/null
+++ b/clients/src/main/java/kafka/common/config/ConfigException.java
@@ -0,0 +1,24 @@
+package kafka.common.config;
+
+import kafka.common.KafkaException;
+
+/**
+ * Thrown if the user supplies an invalid configuration
+ */
+public class ConfigException extends KafkaException {
+
+    private static final long serialVersionUID = 1L;
+
+    public ConfigException(String message) {
+        super(message);
+    }
+
+    public ConfigException(String name, Object value) {
+        this(name, value, null);
+    }
+
+    public ConfigException(String name, Object value, String message) {
+        super("Invalid value " + value + " for configuration " + name + (message == null ? "" : ": " + message));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/errors/ApiException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/ApiException.java b/clients/src/main/java/kafka/common/errors/ApiException.java
new file mode 100644
index 0000000..28f5411
--- /dev/null
+++ b/clients/src/main/java/kafka/common/errors/ApiException.java
@@ -0,0 +1,35 @@
+package kafka.common.errors;
+
+import kafka.common.KafkaException;
+
+/**
+ * Any API exception that is part of the public protocol and should be a subclass of this class and be part of this
+ * package.
+ */
+public abstract class ApiException extends KafkaException {
+
+    private static final long serialVersionUID = 1L;
+
+    public ApiException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ApiException(String message) {
+        super(message);
+    }
+
+    public ApiException(Throwable cause) {
+        super(cause);
+    }
+
+    public ApiException() {
+        super();
+    }
+
+    /* avoid the expensive and useless stack trace for api exceptions */
+    @Override
+    public Throwable fillInStackTrace() {
+        return this;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/errors/CorruptMessageException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/CorruptMessageException.java b/clients/src/main/java/kafka/common/errors/CorruptMessageException.java
new file mode 100644
index 0000000..faf6234
--- /dev/null
+++ b/clients/src/main/java/kafka/common/errors/CorruptMessageException.java
@@ -0,0 +1,23 @@
+package kafka.common.errors;
+
+public class CorruptMessageException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public CorruptMessageException() {
+        super("This message has failed it's CRC checksum or is otherwise corrupt.");
+    }
+
+    public CorruptMessageException(String message) {
+        super(message);
+    }
+
+    public CorruptMessageException(Throwable cause) {
+        super(cause);
+    }
+
+    public CorruptMessageException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java b/clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java
new file mode 100644
index 0000000..d7b86f8
--- /dev/null
+++ b/clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java
@@ -0,0 +1,19 @@
+package kafka.common.errors;
+
+public class LeaderNotAvailableException extends RetryableException {
+
+    private static final long serialVersionUID = 1L;
+
+    public LeaderNotAvailableException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public LeaderNotAvailableException(String message) {
+        super(message);
+    }
+
+    public LeaderNotAvailableException(Throwable cause) {
+        super(cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/errors/MessageTooLargeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/MessageTooLargeException.java b/clients/src/main/java/kafka/common/errors/MessageTooLargeException.java
new file mode 100644
index 0000000..7417906
--- /dev/null
+++ b/clients/src/main/java/kafka/common/errors/MessageTooLargeException.java
@@ -0,0 +1,23 @@
+package kafka.common.errors;
+
+public class MessageTooLargeException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public MessageTooLargeException() {
+        super();
+    }
+
+    public MessageTooLargeException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public MessageTooLargeException(String message) {
+        super(message);
+    }
+
+    public MessageTooLargeException(Throwable cause) {
+        super(cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/errors/NetworkException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/NetworkException.java b/clients/src/main/java/kafka/common/errors/NetworkException.java
new file mode 100644
index 0000000..daedbf4
--- /dev/null
+++ b/clients/src/main/java/kafka/common/errors/NetworkException.java
@@ -0,0 +1,23 @@
+package kafka.common.errors;
+
+public class NetworkException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public NetworkException() {
+        super();
+    }
+
+    public NetworkException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public NetworkException(String message) {
+        super(message);
+    }
+
+    public NetworkException(Throwable cause) {
+        super(cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java b/clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java
new file mode 100644
index 0000000..5d750fd
--- /dev/null
+++ b/clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java
@@ -0,0 +1,23 @@
+package kafka.common.errors;
+
+public class NotLeaderForPartitionException extends RetryableException {
+
+    private static final long serialVersionUID = 1L;
+
+    public NotLeaderForPartitionException() {
+        super();
+    }
+
+    public NotLeaderForPartitionException(String message) {
+        super(message);
+    }
+
+    public NotLeaderForPartitionException(Throwable cause) {
+        super(cause);
+    }
+
+    public NotLeaderForPartitionException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java b/clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java
new file mode 100644
index 0000000..ab9cd62
--- /dev/null
+++ b/clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java
@@ -0,0 +1,22 @@
+package kafka.common.errors;
+
+public class OffsetMetadataTooLarge extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public OffsetMetadataTooLarge() {
+    }
+
+    public OffsetMetadataTooLarge(String message) {
+        super(message);
+    }
+
+    public OffsetMetadataTooLarge(Throwable cause) {
+        super(cause);
+    }
+
+    public OffsetMetadataTooLarge(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java b/clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java
new file mode 100644
index 0000000..93210cd
--- /dev/null
+++ b/clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java
@@ -0,0 +1,22 @@
+package kafka.common.errors;
+
+public class OffsetOutOfRangeException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public OffsetOutOfRangeException() {
+    }
+
+    public OffsetOutOfRangeException(String message) {
+        super(message);
+    }
+
+    public OffsetOutOfRangeException(Throwable cause) {
+        super(cause);
+    }
+
+    public OffsetOutOfRangeException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/errors/RetryableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/RetryableException.java b/clients/src/main/java/kafka/common/errors/RetryableException.java
new file mode 100644
index 0000000..5aa8684
--- /dev/null
+++ b/clients/src/main/java/kafka/common/errors/RetryableException.java
@@ -0,0 +1,31 @@
+package kafka.common.errors;
+
+/**
+ * A retryable exception is an exception that is safe to retry. To be retryable an exception should be
+ * <ol>
+ * <li>Transient, there is no point retrying a error due to a non-existant topic or message too large
+ * <li>Idempotent, the exception is known to not change any state on the server
+ * </ol>
+ * A client may choose to retry any request they like, but exceptions extending this class are always safe and sane to
+ * retry.
+ */
+public abstract class RetryableException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public RetryableException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public RetryableException(String message) {
+        super(message);
+    }
+
+    public RetryableException(Throwable cause) {
+        super(cause);
+    }
+
+    public RetryableException() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/errors/TimeoutException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/TimeoutException.java b/clients/src/main/java/kafka/common/errors/TimeoutException.java
new file mode 100644
index 0000000..da27a98
--- /dev/null
+++ b/clients/src/main/java/kafka/common/errors/TimeoutException.java
@@ -0,0 +1,23 @@
+package kafka.common.errors;
+
+public class TimeoutException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public TimeoutException() {
+        super();
+    }
+
+    public TimeoutException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public TimeoutException(String message) {
+        super(message);
+    }
+
+    public TimeoutException(Throwable cause) {
+        super(cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/errors/UnknownServerException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/UnknownServerException.java b/clients/src/main/java/kafka/common/errors/UnknownServerException.java
new file mode 100644
index 0000000..d0b56d6
--- /dev/null
+++ b/clients/src/main/java/kafka/common/errors/UnknownServerException.java
@@ -0,0 +1,22 @@
+package kafka.common.errors;
+
+public class UnknownServerException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public UnknownServerException() {
+    }
+
+    public UnknownServerException(String message) {
+        super(message);
+    }
+
+    public UnknownServerException(Throwable cause) {
+        super(cause);
+    }
+
+    public UnknownServerException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java b/clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java
new file mode 100644
index 0000000..5c1ca12
--- /dev/null
+++ b/clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java
@@ -0,0 +1,22 @@
+package kafka.common.errors;
+
+public class UnknownTopicOrPartitionException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public UnknownTopicOrPartitionException() {
+    }
+
+    public UnknownTopicOrPartitionException(String message) {
+        super(message);
+    }
+
+    public UnknownTopicOrPartitionException(Throwable throwable) {
+        super(throwable);
+    }
+
+    public UnknownTopicOrPartitionException(String message, Throwable throwable) {
+        super(message, throwable);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/CompoundStat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/CompoundStat.java b/clients/src/main/java/kafka/common/metrics/CompoundStat.java
new file mode 100644
index 0000000..5541e32
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/CompoundStat.java
@@ -0,0 +1,40 @@
+package kafka.common.metrics;
+
+import java.util.List;
+
+/**
+ * A compound stat is a stat where a single measurement and associated data structure feeds many metrics. This is the
+ * example for a histogram which has many associated percentiles.
+ */
+public interface CompoundStat extends Stat {
+
+    public List<NamedMeasurable> stats();
+
+    public static class NamedMeasurable {
+
+        private final String name;
+        private final String description;
+        private final Measurable stat;
+
+        public NamedMeasurable(String name, String description, Measurable stat) {
+            super();
+            this.name = name;
+            this.description = description;
+            this.stat = stat;
+        }
+
+        public String name() {
+            return name;
+        }
+
+        public String description() {
+            return description;
+        }
+
+        public Measurable stat() {
+            return stat;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/JmxReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/kafka/common/metrics/JmxReporter.java
new file mode 100644
index 0000000..a0cee01
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/JmxReporter.java
@@ -0,0 +1,184 @@
+package kafka.common.metrics;
+
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.AttributeNotFoundException;
+import javax.management.DynamicMBean;
+import javax.management.InvalidAttributeValueException;
+import javax.management.JMException;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
+import kafka.common.KafkaException;
+
+/**
+ * Register metrics in JMX as dynamic mbeans based on the metric names
+ */
+public class JmxReporter implements MetricsReporter {
+
+    private final String prefix;
+    private final Map<String, KafkaMbean> mbeans = new HashMap<String, KafkaMbean>();
+
+    public JmxReporter() {
+        this("");
+    }
+
+    /**
+     * Create a JMX reporter that prefixes all metrics with the given string.
+     */
+    public JmxReporter(String prefix) {
+        this.prefix = prefix;
+    }
+
+    @Override
+    public synchronized void init(List<KafkaMetric> metrics) {
+        for (KafkaMetric metric : metrics)
+            addAttribute(metric);
+        for (KafkaMbean mbean : mbeans.values())
+            reregister(mbean);
+
+    }
+
+    @Override
+    public synchronized void metricChange(KafkaMetric metric) {
+        KafkaMbean mbean = addAttribute(metric);
+        reregister(mbean);
+    }
+
+    private KafkaMbean addAttribute(KafkaMetric metric) {
+        try {
+            String[] names = split(prefix + metric.name());
+            String qualifiedName = names[0] + "." + names[1];
+            if (!this.mbeans.containsKey(qualifiedName))
+                mbeans.put(qualifiedName, new KafkaMbean(names[0], names[1]));
+            KafkaMbean mbean = this.mbeans.get(qualifiedName);
+            mbean.setAttribute(names[2], metric);
+            return mbean;
+        } catch (JMException e) {
+            throw new KafkaException("Error creating mbean attribute " + metric.name(), e);
+        }
+    }
+
+    public synchronized void close() {
+        for (KafkaMbean mbean : this.mbeans.values())
+            unregister(mbean);
+
+    }
+
+    private void unregister(KafkaMbean mbean) {
+        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+        try {
+            if (server.isRegistered(mbean.name()))
+                server.unregisterMBean(mbean.name());
+        } catch (JMException e) {
+            throw new KafkaException("Error unregistering mbean", e);
+        }
+    }
+
+    private void reregister(KafkaMbean mbean) {
+        unregister(mbean);
+        try {
+            ManagementFactory.getPlatformMBeanServer().registerMBean(mbean, mbean.name());
+        } catch (JMException e) {
+            throw new KafkaException("Error registering mbean " + mbean.name(), e);
+        }
+    }
+
+    private String[] split(String name) {
+        int attributeStart = name.lastIndexOf('.');
+        if (attributeStart < 0)
+            throw new IllegalArgumentException("No MBean name in metric name: " + name);
+        String attributeName = name.substring(attributeStart + 1, name.length());
+        String remainder = name.substring(0, attributeStart);
+        int beanStart = remainder.lastIndexOf('.');
+        if (beanStart < 0)
+            return new String[] { "", remainder, attributeName };
+        String packageName = remainder.substring(0, beanStart);
+        String beanName = remainder.substring(beanStart + 1, remainder.length());
+        return new String[] { packageName, beanName, attributeName };
+    }
+
+    private static class KafkaMbean implements DynamicMBean {
+        private final String beanName;
+        private final ObjectName objectName;
+        private final Map<String, KafkaMetric> metrics;
+
+        public KafkaMbean(String packageName, String beanName) throws MalformedObjectNameException {
+            this.beanName = beanName;
+            this.metrics = new HashMap<String, KafkaMetric>();
+            this.objectName = new ObjectName(packageName + ":type=" + beanName);
+        }
+
+        public ObjectName name() {
+            return objectName;
+        }
+
+        public void setAttribute(String name, KafkaMetric metric) {
+            this.metrics.put(name, metric);
+        }
+
+        @Override
+        public Object getAttribute(String name) throws AttributeNotFoundException, MBeanException, ReflectionException {
+            if (this.metrics.containsKey(name))
+                return this.metrics.get(name).value();
+            else
+                throw new AttributeNotFoundException("Could not find attribute " + name);
+        }
+
+        @Override
+        public AttributeList getAttributes(String[] names) {
+            try {
+                AttributeList list = new AttributeList();
+                for (String name : names)
+                    list.add(new Attribute(name, getAttribute(name)));
+                return list;
+            } catch (Exception e) {
+                e.printStackTrace();
+                return new AttributeList();
+            }
+        }
+
+        @Override
+        public MBeanInfo getMBeanInfo() {
+            MBeanAttributeInfo[] attrs = new MBeanAttributeInfo[metrics.size()];
+            int i = 0;
+            for (Map.Entry<String, KafkaMetric> entry : this.metrics.entrySet()) {
+                String attribute = entry.getKey();
+                KafkaMetric metric = entry.getValue();
+                attrs[i] = new MBeanAttributeInfo(attribute, double.class.getName(), metric.description(), true, false, false);
+                i += 1;
+            }
+            return new MBeanInfo(beanName, "", attrs, null, null, null);
+        }
+
+        @Override
+        public Object invoke(String name, Object[] params, String[] sig) throws MBeanException, ReflectionException {
+            throw new UnsupportedOperationException("Set not allowed.");
+        }
+
+        @Override
+        public void setAttribute(Attribute attribute) throws AttributeNotFoundException,
+                                                     InvalidAttributeValueException,
+                                                     MBeanException,
+                                                     ReflectionException {
+            throw new UnsupportedOperationException("Set not allowed.");
+        }
+
+        @Override
+        public AttributeList setAttributes(AttributeList list) {
+            throw new UnsupportedOperationException("Set not allowed.");
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/KafkaMetric.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/kafka/common/metrics/KafkaMetric.java
new file mode 100644
index 0000000..33212b0
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/KafkaMetric.java
@@ -0,0 +1,55 @@
+package kafka.common.metrics;
+
+import kafka.common.Metric;
+import kafka.common.utils.Time;
+
+public final class KafkaMetric implements Metric {
+
+    private final String name;
+    private final String description;
+    private final Object lock;
+    private final Time time;
+    private final Measurable measurable;
+    private MetricConfig config;
+
+    KafkaMetric(Object lock, String name, String description, Measurable measurable, MetricConfig config, Time time) {
+        super();
+        this.name = name;
+        this.description = description;
+        this.lock = lock;
+        this.measurable = measurable;
+        this.config = config;
+        this.time = time;
+    }
+
+    MetricConfig config() {
+        return this.config;
+    }
+
+    @Override
+    public String name() {
+        return this.name;
+    }
+
+    @Override
+    public String description() {
+        return this.description;
+    }
+
+    @Override
+    public double value() {
+        synchronized (this.lock) {
+            return value(time.nanoseconds());
+        }
+    }
+
+    double value(long time) {
+        return this.measurable.measure(config, time);
+    }
+
+    public void config(MetricConfig config) {
+        synchronized (lock) {
+            this.config = config;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/Measurable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/Measurable.java b/clients/src/main/java/kafka/common/metrics/Measurable.java
new file mode 100644
index 0000000..f5511ea
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/Measurable.java
@@ -0,0 +1,16 @@
+package kafka.common.metrics;
+
+/**
+ * A measurable quantity that can be registered as a metric
+ */
+public interface Measurable {
+
+    /**
+     * Measure this quantity and return the result as a double
+     * @param config The configuration for this metric
+     * @param now The time the measurement is being taken
+     * @return The measured value
+     */
+    public double measure(MetricConfig config, long now);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/MeasurableStat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/MeasurableStat.java b/clients/src/main/java/kafka/common/metrics/MeasurableStat.java
new file mode 100644
index 0000000..74d3bb4
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/MeasurableStat.java
@@ -0,0 +1,10 @@
+package kafka.common.metrics;
+
+/**
+ * A MeasurableStat is a {@link Stat} that is also {@link Measurable} (i.e. can produce a single floating point value).
+ * This is the interface used for most of the simple statistics such as {@link kafka.common.metrics.stats.Avg},
+ * {@link kafka.common.metrics.stats.Max}, {@link kafka.common.metrics.stats.Count}, etc.
+ */
+public interface MeasurableStat extends Stat, Measurable {
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/MetricConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/MetricConfig.java b/clients/src/main/java/kafka/common/metrics/MetricConfig.java
new file mode 100644
index 0000000..92f67f0
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/MetricConfig.java
@@ -0,0 +1,71 @@
+package kafka.common.metrics;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Configuration values for metrics
+ */
+public class MetricConfig {
+
+    private Quota quota;
+    private int samples;
+    private long eventWindow;
+    private long timeWindowNs;
+    private TimeUnit unit;
+
+    public MetricConfig() {
+        super();
+        this.quota = null;
+        this.samples = 2;
+        this.eventWindow = Long.MAX_VALUE;
+        this.timeWindowNs = TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
+        this.unit = TimeUnit.SECONDS;
+    }
+
+    public Quota quota() {
+        return this.quota;
+    }
+
+    public MetricConfig quota(Quota quota) {
+        this.quota = quota;
+        return this;
+    }
+
+    public long eventWindow() {
+        return eventWindow;
+    }
+
+    public MetricConfig eventWindow(long window) {
+        this.eventWindow = window;
+        return this;
+    }
+
+    public long timeWindowNs() {
+        return timeWindowNs;
+    }
+
+    public MetricConfig timeWindow(long window, TimeUnit unit) {
+        this.timeWindowNs = TimeUnit.NANOSECONDS.convert(window, unit);
+        return this;
+    }
+
+    public int samples() {
+        return this.samples;
+    }
+
+    public MetricConfig samples(int samples) {
+        if (samples < 1)
+            throw new IllegalArgumentException("The number of samples must be at least 1.");
+        this.samples = samples;
+        return this;
+    }
+
+    public TimeUnit timeUnit() {
+        return unit;
+    }
+
+    public MetricConfig timeUnit(TimeUnit unit) {
+        this.unit = unit;
+        return this;
+    }
+}