You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/02/07 01:26:38 UTC
[07/13] Rename client package from kafka.* to org.apache.kafka.*
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
new file mode 100644
index 0000000..5a81f35
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -0,0 +1,505 @@
+package org.apache.kafka.clients.producer.internals;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.NetworkException;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.network.NetworkSend;
+import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.RequestSend;
+import org.apache.kafka.common.requests.ResponseHeader;
+import org.apache.kafka.common.utils.Time;
+
+
+/**
+ * The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
+ * requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.
+ */
+public class Sender implements Runnable {
+
+ private final Map<Integer, NodeState> nodeState;
+ private final RecordAccumulator accumulator;
+ private final Selectable selector;
+ private final String clientId;
+ private final int maxRequestSize;
+ private final long reconnectBackoffMs;
+ private final short acks;
+ private final int requestTimeout;
+ private final InFlightRequests inFlightRequests;
+ private final Metadata metadata;
+ private final Time time;
+ private int correlation;
+ private boolean metadataFetchInProgress;
+ private volatile boolean running;
+
+ public Sender(Selectable selector,
+ Metadata metadata,
+ RecordAccumulator accumulator,
+ String clientId,
+ int maxRequestSize,
+ long reconnectBackoffMs,
+ short acks,
+ int requestTimeout,
+ Time time) {
+ this.nodeState = new HashMap<Integer, NodeState>();
+ this.accumulator = accumulator;
+ this.selector = selector;
+ this.maxRequestSize = maxRequestSize;
+ this.reconnectBackoffMs = reconnectBackoffMs;
+ this.metadata = metadata;
+ this.clientId = clientId;
+ this.running = true;
+ this.requestTimeout = requestTimeout;
+ this.acks = acks;
+ this.inFlightRequests = new InFlightRequests();
+ this.correlation = 0;
+ this.metadataFetchInProgress = false;
+ this.time = time;
+ }
+
+ /**
+ * The main run loop for the sender thread
+ */
+ public void run() {
+ // main loop, runs until close is called
+ while (running) {
+ try {
+ run(time.milliseconds());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ // send anything left in the accumulator
+ int unsent = 0;
+ do {
+ try {
+ unsent = run(time.milliseconds());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ } while (unsent > 0);
+
+ // close all the connections
+ this.selector.close();
+ }
+
+ /**
+ * Run a single iteration of sending
+ *
+ * @param now The current time
+ * @return The total number of topic/partitions that had data ready (regardless of what we actually sent)
+ */
+ public int run(long now) {
+ Cluster cluster = metadata.fetch();
+ // get the list of partitions with data ready to send
+ List<TopicPartition> ready = this.accumulator.ready(now);
+
+ // prune the list of ready topics to eliminate any that we aren't ready to send yet
+ List<TopicPartition> sendable = processReadyPartitions(cluster, ready, now);
+
+ // should we update our metadata?
+ List<NetworkSend> sends = new ArrayList<NetworkSend>(sendable.size());
+ InFlightRequest metadataReq = maybeMetadataRequest(cluster, now);
+ if (metadataReq != null) {
+ sends.add(metadataReq.request);
+ this.inFlightRequests.add(metadataReq);
+ }
+
+ // create produce requests
+ List<RecordBatch> batches = this.accumulator.drain(sendable, this.maxRequestSize);
+ List<InFlightRequest> requests = collate(cluster, batches);
+ for (int i = 0; i < requests.size(); i++) {
+ InFlightRequest request = requests.get(i);
+ this.inFlightRequests.add(request);
+ sends.add(request.request);
+ }
+
+ // do the I/O
+ try {
+ this.selector.poll(5L, sends);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ // handle responses, connections, and disconnections
+ handleSends(this.selector.completedSends());
+ handleResponses(this.selector.completedReceives(), now);
+ handleDisconnects(this.selector.disconnected());
+ handleConnects(this.selector.connected());
+
+ return ready.size();
+ }
+
+ private InFlightRequest maybeMetadataRequest(Cluster cluster, long now) {
+ if (this.metadataFetchInProgress || !metadata.needsUpdate(now))
+ return null;
+ Node node = cluster.nextNode();
+ NodeState state = nodeState.get(node.id());
+ if (state == null || (state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs)) {
+ // we don't have a connection to this node right now, make one
+ initiateConnect(node, now);
+ return null;
+ } else if (state.state == ConnectionState.CONNECTED) {
+ this.metadataFetchInProgress = true;
+ return metadataRequest(node.id(), metadata.topics());
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Start closing the sender (won't actually complete until all data is sent out)
+ */
+ public void initiateClose() {
+ this.running = false;
+ this.accumulator.close();
+ }
+
+ /**
+ * Process the set of topic-partitions with data ready to send. If we have a connection to the appropriate node, add
+ * it to the returned set. For any partitions we have no connection to either make one, fetch the appropriate
+ * metdata to be able to do so
+ */
+ private List<TopicPartition> processReadyPartitions(Cluster cluster, List<TopicPartition> ready, long now) {
+ List<TopicPartition> sendable = new ArrayList<TopicPartition>(ready.size());
+ for (TopicPartition tp : ready) {
+ Node node = cluster.leaderFor(tp);
+ if (node == null) {
+ // we don't know about this topic/partition or it has no leader, re-fetch metadata
+ metadata.forceUpdate();
+ } else {
+ NodeState state = nodeState.get(node.id());
+ // TODO: encapsulate this logic somehow
+ if (state == null || (state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs)) {
+ // we don't have a connection to this node right now, make one
+ initiateConnect(node, now);
+ } else if (state.state == ConnectionState.CONNECTED && inFlightRequests.canSendMore(node.id())) {
+ sendable.add(tp);
+ }
+ }
+ }
+ return sendable;
+ }
+
+ /**
+ * Initiate a connection to the given node
+ */
+ private void initiateConnect(Node node, long now) {
+ try {
+ selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 64 * 1024 * 1024, 64 * 1024 * 1024); // TODO
+ // socket
+ // buffers
+ nodeState.put(node.id(), new NodeState(ConnectionState.CONNECTING, now));
+ } catch (IOException e) {
+ /* attempt failed, we'll try again after the backoff */
+ nodeState.put(node.id(), new NodeState(ConnectionState.DISCONNECTED, now));
+ /* maybe the problem is our metadata, update it */
+ metadata.forceUpdate();
+ }
+ }
+
+ /**
+ * Handle any closed connections
+ */
+ private void handleDisconnects(List<Integer> disconnects) {
+ for (int node : disconnects) {
+ for (InFlightRequest request : this.inFlightRequests.clearAll(node)) {
+ if (request.batches != null) {
+ for (RecordBatch batch : request.batches.values())
+ batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response."));
+ this.accumulator.deallocate(request.batches.values());
+ }
+ NodeState state = this.nodeState.get(request.request.destination());
+ if (state != null)
+ state.state = ConnectionState.DISCONNECTED;
+ }
+ }
+ }
+
+ /**
+ * Record any connections that completed in our node state
+ */
+ private void handleConnects(List<Integer> connects) {
+ for (Integer id : connects)
+ this.nodeState.get(id).state = ConnectionState.CONNECTED;
+ }
+
+ /**
+ * Process completed sends
+ */
+ public void handleSends(List<NetworkSend> sends) {
+ /* if acks = 0 then the request is satisfied once sent */
+ for (NetworkSend send : sends) {
+ Deque<InFlightRequest> requests = this.inFlightRequests.requestQueue(send.destination());
+ InFlightRequest request = requests.peekFirst();
+ if (!request.expectResponse) {
+ requests.pollFirst();
+ if (request.request.header().apiKey() == ApiKeys.PRODUCE.id) {
+ for (RecordBatch batch : request.batches.values())
+ batch.done(-1L, Errors.NONE.exception());
+ this.accumulator.deallocate(request.batches.values());
+ }
+ }
+ }
+ }
+
+ /**
+ * Handle responses from the server
+ */
+ private void handleResponses(List<NetworkReceive> receives, long now) {
+ for (NetworkReceive receive : receives) {
+ int source = receive.source();
+ InFlightRequest req = inFlightRequests.nextCompleted(source);
+ ResponseHeader header = ResponseHeader.parse(receive.payload());
+ short apiKey = req.request.header().apiKey();
+ Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
+ correlate(req.request.header(), header);
+ if (req.request.header().apiKey() == ApiKeys.PRODUCE.id)
+ handleProduceResponse(req, body);
+ else if (req.request.header().apiKey() == ApiKeys.METADATA.id)
+ handleMetadataResponse(body, now);
+ else
+ throw new IllegalStateException("Unexpected response type: " + req.request.header().apiKey());
+ }
+ }
+
+ private void handleMetadataResponse(Struct body, long now) {
+ this.metadataFetchInProgress = false;
+ Cluster cluster = ProtoUtils.parseMetadataResponse(body);
+ this.metadata.update(cluster, now);
+ }
+
+ /**
+ * Handle a produce response
+ */
+ private void handleProduceResponse(InFlightRequest request, Struct response) {
+ for (Object topicResponse : (Object[]) response.get("responses")) {
+ Struct topicRespStruct = (Struct) topicResponse;
+ String topic = (String) topicRespStruct.get("topic");
+ for (Object partResponse : (Object[]) topicRespStruct.get("partition_responses")) {
+ Struct partRespStruct = (Struct) partResponse;
+ int partition = (Integer) partRespStruct.get("partition");
+ short errorCode = (Short) partRespStruct.get("error_code");
+ long offset = (Long) partRespStruct.get("base_offset");
+ RecordBatch batch = request.batches.get(new TopicPartition(topic, partition));
+ batch.done(offset, Errors.forCode(errorCode).exception());
+ }
+ }
+ this.accumulator.deallocate(request.batches.values());
+ }
+
+ /**
+ * Validate that the response corresponds to the request we expect or else explode
+ */
+ private void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
+ if (requestHeader.correlationId() != responseHeader.correlationId())
+ throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId()
+ + ") does not match request ("
+ + requestHeader.correlationId()
+ + ")");
+ }
+
+ /**
+ * Create a metadata request for the given topics
+ */
+ private InFlightRequest metadataRequest(int node, Set<String> topics) {
+ String[] ts = new String[topics.size()];
+ topics.toArray(ts);
+ Struct body = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id));
+ body.set("topics", topics.toArray());
+ RequestSend send = new RequestSend(node, new RequestHeader(ApiKeys.METADATA.id, clientId, correlation++), body);
+ return new InFlightRequest(true, send, null);
+ }
+
+ /**
+ * Collate the record batches into a list of produce requests on a per-node basis
+ */
+ private List<InFlightRequest> collate(Cluster cluster, List<RecordBatch> batches) {
+ Map<Integer, List<RecordBatch>> collated = new HashMap<Integer, List<RecordBatch>>();
+ for (RecordBatch batch : batches) {
+ Node node = cluster.leaderFor(batch.topicPartition);
+ List<RecordBatch> found = collated.get(node.id());
+ if (found == null) {
+ found = new ArrayList<RecordBatch>();
+ collated.put(node.id(), found);
+ }
+ found.add(batch);
+ }
+ List<InFlightRequest> requests = new ArrayList<InFlightRequest>(collated.size());
+ for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
+ requests.add(produceRequest(entry.getKey(), acks, requestTimeout, entry.getValue()));
+ return requests;
+ }
+
+ /**
+ * Create a produce request from the given record batches
+ */
+ private InFlightRequest produceRequest(int destination, short acks, int timeout, List<RecordBatch> batches) {
+ Map<TopicPartition, RecordBatch> batchesByPartition = new HashMap<TopicPartition, RecordBatch>();
+ Map<String, List<RecordBatch>> batchesByTopic = new HashMap<String, List<RecordBatch>>();
+ for (RecordBatch batch : batches) {
+ batchesByPartition.put(batch.topicPartition, batch);
+ List<RecordBatch> found = batchesByTopic.get(batch.topicPartition.topic());
+ if (found == null) {
+ found = new ArrayList<RecordBatch>();
+ batchesByTopic.put(batch.topicPartition.topic(), found);
+ }
+ found.add(batch);
+ }
+ Struct produce = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id));
+ produce.set("acks", acks);
+ produce.set("timeout", timeout);
+ List<Struct> topicDatas = new ArrayList<Struct>(batchesByTopic.size());
+ for (Map.Entry<String, List<RecordBatch>> entry : batchesByTopic.entrySet()) {
+ Struct topicData = produce.instance("topic_data");
+ topicData.set("topic", entry.getKey());
+ List<RecordBatch> parts = entry.getValue();
+ Object[] partitionData = new Object[parts.size()];
+ for (int i = 0; i < parts.size(); i++) {
+ ByteBuffer buffer = parts.get(i).records.buffer();
+ buffer.flip();
+ Struct part = topicData.instance("data")
+ .set("partition", parts.get(i).topicPartition.partition())
+ .set("record_set", buffer);
+ partitionData[i] = part;
+ }
+ topicData.set("data", partitionData);
+ topicDatas.add(topicData);
+ }
+ produce.set("topic_data", topicDatas.toArray());
+
+ RequestHeader header = new RequestHeader(ApiKeys.PRODUCE.id, clientId, correlation++);
+ RequestSend send = new RequestSend(destination, header, produce);
+ return new InFlightRequest(acks != 0, send, batchesByPartition);
+ }
+
+ /**
+ * Wake up the selector associated with this send thread
+ */
+ public void wakeup() {
+ this.selector.wakeup();
+ }
+
+ /**
+ * The states of a node connection
+ */
+ private static enum ConnectionState {
+ DISCONNECTED, CONNECTING, CONNECTED
+ }
+
+ /**
+ * The state of a node
+ */
+ private static final class NodeState {
+ private ConnectionState state;
+ private long lastConnectAttempt;
+
+ public NodeState(ConnectionState state, long lastConnectAttempt) {
+ this.state = state;
+ this.lastConnectAttempt = lastConnectAttempt;
+ }
+
+ public String toString() {
+ return "NodeState(" + state + ", " + lastConnectAttempt + ")";
+ }
+ }
+
+ /**
+ * An request that hasn't been fully processed yet
+ */
+ private static final class InFlightRequest {
+ public boolean expectResponse;
+ public Map<TopicPartition, RecordBatch> batches;
+ public RequestSend request;
+
+ /**
+ * @param expectResponse Should we expect a response message or is this request complete once it is sent?
+ * @param request The request
+ * @param batches The record batches contained in the request if it is a produce request
+ */
+ public InFlightRequest(boolean expectResponse, RequestSend request, Map<TopicPartition, RecordBatch> batches) {
+ this.batches = batches;
+ this.request = request;
+ this.expectResponse = expectResponse;
+ }
+ }
+
+ /**
+ * A set of outstanding request queues for each node that have not yet received responses
+ */
+ private static final class InFlightRequests {
+ private final Map<Integer, Deque<InFlightRequest>> requests = new HashMap<Integer, Deque<InFlightRequest>>();
+
+ /**
+ * Add the given request to the queue for the node it was directed to
+ */
+ public void add(InFlightRequest request) {
+ Deque<InFlightRequest> reqs = this.requests.get(request.request.destination());
+ if (reqs == null) {
+ reqs = new ArrayDeque<InFlightRequest>();
+ this.requests.put(request.request.destination(), reqs);
+ }
+ reqs.addFirst(request);
+ }
+
+ public Deque<InFlightRequest> requestQueue(int node) {
+ Deque<InFlightRequest> reqs = requests.get(node);
+ if (reqs == null || reqs.isEmpty())
+ throw new IllegalStateException("Response from server for which there are no in-flight requests.");
+ return reqs;
+ }
+
+ /**
+ * Get the oldest request (the one that that will be completed next) for the given node
+ */
+ public InFlightRequest nextCompleted(int node) {
+ return requestQueue(node).pollLast();
+ }
+
+ /**
+ * Can we send more requests to this node?
+ *
+ * @param node Node in question
+ * @return true iff we have no requests still being sent to the given node
+ */
+ public boolean canSendMore(int node) {
+ Deque<InFlightRequest> queue = requests.get(node);
+ return queue == null || queue.isEmpty() || queue.peekFirst().request.complete();
+ }
+
+ /**
+ * Clear out all the in-flight requests for the given node and return them
+ *
+ * @param node The node
+ * @return All the in-flight requests for that node that have been removed
+ */
+ public Iterable<InFlightRequest> clearAll(int node) {
+ Deque<InFlightRequest> reqs = requests.get(node);
+ if (reqs == null) {
+ return Collections.emptyList();
+ } else {
+ return requests.remove(node);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
new file mode 100644
index 0000000..b5e792b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
@@ -0,0 +1,66 @@
+package org.apache.kafka.clients.tools;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.record.Records;
+
+
+public class ProducerPerformance {
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 3) {
+ System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url num_records record_size");
+ System.exit(1);
+ }
+ String url = args[0];
+ int numRecords = Integer.parseInt(args[1]);
+ int recordSize = Integer.parseInt(args[2]);
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.REQUIRED_ACKS_CONFIG, "1");
+ props.setProperty(ProducerConfig.BROKER_LIST_CONFIG, url);
+ props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5 * 1000));
+ props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE));
+
+ KafkaProducer producer = new KafkaProducer(props);
+ Callback callback = new Callback() {
+ public void onCompletion(RecordMetadata metadata, Exception e) {
+ if (e != null)
+ e.printStackTrace();
+ }
+ };
+ byte[] payload = new byte[recordSize];
+ Arrays.fill(payload, (byte) 1);
+ ProducerRecord record = new ProducerRecord("test", payload);
+ long start = System.currentTimeMillis();
+ long maxLatency = -1L;
+ long totalLatency = 0;
+ int reportingInterval = 1000000;
+ for (int i = 0; i < numRecords; i++) {
+ long sendStart = System.currentTimeMillis();
+ producer.send(record, callback);
+ long sendEllapsed = System.currentTimeMillis() - sendStart;
+ maxLatency = Math.max(maxLatency, sendEllapsed);
+ totalLatency += sendEllapsed;
+ if (i % reportingInterval == 0) {
+ System.out.printf("%d max latency = %d ms, avg latency = %.5f\n",
+ i,
+ maxLatency,
+ (totalLatency / (double) reportingInterval));
+ totalLatency = 0L;
+ maxLatency = -1L;
+ }
+ }
+ long ellapsed = System.currentTimeMillis() - start;
+ double msgsSec = 1000.0 * numRecords / (double) ellapsed;
+ double mbSec = msgsSec * (recordSize + Records.LOG_OVERHEAD) / (1024.0 * 1024.0);
+ System.out.printf("%d records sent in %d ms ms. %.2f records per second (%.2f mb/sec).", numRecords, ellapsed, msgsSec, mbSec);
+ producer.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
new file mode 100644
index 0000000..634895c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -0,0 +1,124 @@
+package org.apache.kafka.common;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kafka.common.utils.Utils;
+
+
+/**
+ * A representation of a subset of the nodes, topics, and partitions in the Kafka cluster.
+ */
+public final class Cluster {
+
+ private final AtomicInteger counter = new AtomicInteger(0);
+ private final List<Node> nodes;
+ private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
+ private final Map<String, List<PartitionInfo>> partitionsByTopic;
+
+ /**
+ * Create a new cluster with the given nodes and partitions
+ * @param nodes The nodes in the cluster
+ * @param partitions Information about a subset of the topic-partitions this cluster hosts
+ */
+ public Cluster(Collection<Node> nodes, Collection<PartitionInfo> partitions) {
+ // make a randomized, unmodifiable copy of the nodes
+ List<Node> copy = new ArrayList<Node>(nodes);
+ Collections.shuffle(copy);
+ this.nodes = Collections.unmodifiableList(copy);
+
+ // index the partitions by topic/partition for quick lookup
+ this.partitionsByTopicPartition = new HashMap<TopicPartition, PartitionInfo>(partitions.size());
+ for (PartitionInfo p : partitions)
+ this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
+
+ // index the partitions by topic and make the lists unmodifiable so we can handle them out in
+ // user-facing apis without risk of the client modifying the contents
+ HashMap<String, List<PartitionInfo>> parts = new HashMap<String, List<PartitionInfo>>();
+ for (PartitionInfo p : partitions) {
+ if (!parts.containsKey(p.topic()))
+ parts.put(p.topic(), new ArrayList<PartitionInfo>());
+ List<PartitionInfo> ps = parts.get(p.topic());
+ ps.add(p);
+ }
+ this.partitionsByTopic = new HashMap<String, List<PartitionInfo>>(parts.size());
+ for (Map.Entry<String, List<PartitionInfo>> entry : parts.entrySet())
+ this.partitionsByTopic.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
+ }
+
+ /**
+ * Create an empty cluster instance with no nodes and no topic-partitions.
+ */
+ public static Cluster empty() {
+ return new Cluster(new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0));
+ }
+
+ /**
+ * Create a "bootstrap" cluster using the given list of host/ports
+ * @param addresses The addresses
+ * @return A cluster for these hosts/ports
+ */
+ public static Cluster bootstrap(List<InetSocketAddress> addresses) {
+ List<Node> nodes = new ArrayList<Node>();
+ int nodeId = Integer.MIN_VALUE;
+ for (InetSocketAddress address : addresses)
+ nodes.add(new Node(nodeId++, address.getHostName(), address.getPort()));
+ return new Cluster(nodes, new ArrayList<PartitionInfo>(0));
+ }
+
+ /**
+ * @return The known set of nodes
+ */
+ public List<Node> nodes() {
+ return this.nodes;
+ }
+
+ /**
+ * Get the current leader for the given topic-partition
+ * @param topicPartition The topic and partition we want to know the leader for
+ * @return The node that is the leader for this topic-partition, or null if there is currently no leader
+ */
+ public Node leaderFor(TopicPartition topicPartition) {
+ PartitionInfo info = partitionsByTopicPartition.get(topicPartition);
+ if (info == null)
+ return null;
+ else
+ return info.leader();
+ }
+
+ /**
+ * Get the metadata for the specified partition
+ * @param topicPartition The topic and partition to fetch info for
+ * @return The metadata about the given topic and partition
+ */
+ public PartitionInfo partition(TopicPartition topicPartition) {
+ return partitionsByTopicPartition.get(topicPartition);
+ }
+
+ /**
+ * Get the list of partitions for this topic
+ * @param topic The topic name
+ * @return A list of partitions
+ */
+ public List<PartitionInfo> partitionsFor(String topic) {
+ return this.partitionsByTopic.get(topic);
+ }
+
+ /**
+ * Round-robin over the nodes in this cluster
+ */
+ public Node nextNode() {
+ int size = nodes.size();
+ if (size == 0)
+ throw new IllegalStateException("No known nodes.");
+ int idx = Utils.abs(counter.getAndIncrement()) % size;
+ return this.nodes.get(idx);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/Configurable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Configurable.java b/clients/src/main/java/org/apache/kafka/common/Configurable.java
new file mode 100644
index 0000000..37da357
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/Configurable.java
@@ -0,0 +1,15 @@
+package org.apache.kafka.common;
+
+import java.util.Map;
+
+/**
+ * A Mix-in style interface for classes that are instantiated by reflection and need to take configuration parameters
+ */
+public interface Configurable {
+
+ /**
+ * Configure this class with the given key-value pairs
+ */
+ public void configure(Map<String, ?> configs);
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/KafkaException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaException.java b/clients/src/main/java/org/apache/kafka/common/KafkaException.java
new file mode 100644
index 0000000..d529a04
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/KafkaException.java
@@ -0,0 +1,26 @@
+package org.apache.kafka.common;
+
+/**
+ * The base class of all other Kafka exceptions
+ */
+public class KafkaException extends RuntimeException {
+
+ private final static long serialVersionUID = 1L;
+
+ public KafkaException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public KafkaException(String message) {
+ super(message);
+ }
+
+ public KafkaException(Throwable cause) {
+ super(cause);
+ }
+
+ public KafkaException() {
+ super();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/Metric.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Metric.java b/clients/src/main/java/org/apache/kafka/common/Metric.java
new file mode 100644
index 0000000..5580070
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/Metric.java
@@ -0,0 +1,23 @@
+package org.apache.kafka.common;
+
+/**
+ * A numerical metric tracked for monitoring purposes
+ */
+public interface Metric {
+
+ /**
+ * A unique name for this metric
+ */
+ public String name();
+
+ /**
+ * A description of what is measured...this will be "" if no description was given
+ */
+ public String description();
+
+ /**
+ * The value of the metric
+ */
+ public double value();
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/Node.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java b/clients/src/main/java/org/apache/kafka/common/Node.java
new file mode 100644
index 0000000..452cbb1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/Node.java
@@ -0,0 +1,76 @@
+package org.apache.kafka.common;
+
+/**
+ * Information about a Kafka node
+ */
+public class Node {
+
+ private final int id;
+ private final String host;
+ private final int port;
+
+ public Node(int id, String host, int port) {
+ super();
+ this.id = id;
+ this.host = host;
+ this.port = port;
+ }
+
+ /**
+ * The node id of this node
+ */
+ public int id() {
+ return id;
+ }
+
+ /**
+ * The host name for this node
+ */
+ public String host() {
+ return host;
+ }
+
+ /**
+ * The port for this node
+ */
+ public int port() {
+ return port;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((host == null) ? 0 : host.hashCode());
+ result = prime * result + id;
+ result = prime * result + port;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ Node other = (Node) obj;
+ if (host == null) {
+ if (other.host != null)
+ return false;
+ } else if (!host.equals(other.host))
+ return false;
+ if (id != other.id)
+ return false;
+ if (port != other.port)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "Node(" + id + ", " + host + ", " + port + ")";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
new file mode 100644
index 0000000..06babef
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
@@ -0,0 +1,58 @@
+package org.apache.kafka.common;
+
+/**
+ * Information about a topic-partition.
+ */
+public class PartitionInfo {
+
+ private final String topic;
+ private final int partition;
+ private final Node leader;
+ private final Node[] replicas;
+ private final Node[] inSyncReplicas;
+
+ public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) {
+ this.topic = topic;
+ this.partition = partition;
+ this.leader = leader;
+ this.replicas = replicas;
+ this.inSyncReplicas = inSyncReplicas;
+ }
+
+ /**
+ * The topic name
+ */
+ public String topic() {
+ return topic;
+ }
+
+ /**
+ * The partition id
+ */
+ public int partition() {
+ return partition;
+ }
+
+ /**
+ * The node id of the node currently acting as a leader for this partition or -1 if there is no leader
+ */
+ public Node leader() {
+ return leader;
+ }
+
+ /**
+ * The complete set of replicas for this partition regardless of whether they are alive or up-to-date
+ */
+ public Node[] replicas() {
+ return replicas;
+ }
+
+ /**
+ * The subset of the replicas that are in sync, that is caught-up to the leader and ready to take over as leader if
+ * the leader should fail
+ */
+ public Node[] inSyncReplicas() {
+ return inSyncReplicas;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
new file mode 100644
index 0000000..7ac0604
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
@@ -0,0 +1,61 @@
+package org.apache.kafka.common;
+
+/**
+ * A topic name and partition number
+ */
+public final class TopicPartition {
+
+ private int hash = 0;
+ private final int partition;
+ private final String topic;
+
+ public TopicPartition(String topic, int partition) {
+ this.partition = partition;
+ this.topic = topic;
+ }
+
+ public int partition() {
+ return partition;
+ }
+
+ public String topic() {
+ return topic;
+ }
+
+ @Override
+ public int hashCode() {
+ if (hash != 0)
+ return hash;
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + partition;
+ result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+ this.hash = result;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ TopicPartition other = (TopicPartition) obj;
+ if (partition != other.partition)
+ return false;
+ if (topic == null) {
+ if (other.topic != null)
+ return false;
+ } else if (!topic.equals(other.topic))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return topic + "-" + partition;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
new file mode 100644
index 0000000..5d548d0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -0,0 +1,94 @@
+package org.apache.kafka.common.config;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Utils;
+
+
+/**
+ * A convenient base class for configurations to extend.
+ * <p>
+ * This class holds both the original configuration that was provided as well as the parsed
+ */
+public class AbstractConfig {
+
+ private final Set<String> used;
+ private final Map<String, Object> values;
+ private final Map<String, ?> originals;
+
+ @SuppressWarnings("unchecked")
+ public AbstractConfig(ConfigDef definition, Map<?, ?> originals) {
+ /* check that all the keys are really strings */
+ for (Object key : originals.keySet())
+ if (!(key instanceof String))
+ throw new ConfigException(key.toString(), originals.get(key), "Key must be a string.");
+ this.originals = (Map<String, ?>) originals;
+ this.values = definition.parse(this.originals);
+ this.used = Collections.synchronizedSet(new HashSet<String>());
+ }
+
+ protected Object get(String key) {
+ if (!values.containsKey(key))
+ throw new ConfigException(String.format("Unknown configuration '%s'", key));
+ used.add(key);
+ return values.get(key);
+ }
+
+ public int getInt(String key) {
+ return (Integer) get(key);
+ }
+
+ public Long getLong(String key) {
+ return (Long) get(key);
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<String> getList(String key) {
+ return (List<String>) get(key);
+ }
+
+ public boolean getBoolean(String key) {
+ return (Boolean) get(key);
+ }
+
+ public String getString(String key) {
+ return (String) get(key);
+ }
+
+ public Class<?> getClass(String key) {
+ return (Class<?>) get(key);
+ }
+
+ public Set<String> unused() {
+ Set<String> keys = new HashSet<String>(originals.keySet());
+ keys.remove(used);
+ return keys;
+ }
+
+ /**
+ * Get a configured instance of the give class specified by the given configuration key. If the object implements
+ * Configurable configure it using the configuration.
+ *
+ * @param key The configuration key for the class
+ * @param t The interface the class should implement
+ * @return A configured instance of the class
+ */
+ public <T> T getConfiguredInstance(String key, Class<T> t) {
+ Class<?> c = getClass(key);
+ if (c == null)
+ return null;
+ Object o = Utils.newInstance(c);
+ if (!t.isInstance(o))
+ throw new KafkaException(c.getName() + " is not an instance of " + t.getName());
+ if (o instanceof Configurable)
+ ((Configurable) o).configure(this.originals);
+ return t.cast(o);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
new file mode 100644
index 0000000..cc4bc48
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -0,0 +1,253 @@
+package org.apache.kafka.common.config;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class is used for specifying the set of expected configurations, their type, their defaults, their
+ * documentation, and any special validation logic used for checking the correctness of the values the user provides.
+ * <p>
+ * Usage of this class looks something like this:
+ *
+ * <pre>
+ * ConfigDef defs = new ConfigDef();
+ * defs.define("config_name", Type.STRING, "default string value", "This configuration is used for blah blah blah.");
+ * defs.define("another_config_name", Type.INT, 42, Range.atLeast(0), "More documentation on this config");
+ *
+ * Properties props = new Properties();
+ * props.setProperty("config_name", "some value");
+ * Map<String, Object> configs = defs.parse(props);
+ *
+ * String someConfig = (String) configs.get("config_name"); // will return "some value"
+ * int anotherConfig = (Integer) configs.get("another_config_name"); // will return default value of 42
+ * </pre>
+ *
+ * This class can be used stand-alone or in combination with {@link AbstractConfig} which provides some additional
+ * functionality for accessing configs.
+ */
+public class ConfigDef {
+
+ private static final Object NO_DEFAULT_VALUE = new Object();
+
+ private final Map<String, ConfigKey> configKeys = new HashMap<String, ConfigKey>();
+
+ /**
+ * Define a new configuration
+ * @param name The name of the config parameter
+ * @param type The type of the config
+ * @param defaultValue The default value to use if this config isn't present
+ * @param validator A validator to use in checking the correctness of the config
+ * @param documentation The documentation string for the config
+ * @return This ConfigDef so you can chain calls
+ */
+ public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, String documentation) {
+ if (configKeys.containsKey(name))
+ throw new ConfigException("Configuration " + name + " is defined twice.");
+ Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
+ configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, documentation));
+ return this;
+ }
+
+ /**
+ * Define a new configuration with no special validation logic
+ * @param name The name of the config parameter
+ * @param type The type of the config
+ * @param defaultValue The default value to use if this config isn't present
+ * @param documentation The documentation string for the config
+ * @return This ConfigDef so you can chain calls
+ */
+ public ConfigDef define(String name, Type type, Object defaultValue, String documentation) {
+ return define(name, type, defaultValue, null, documentation);
+ }
+
+ /**
+ * Define a required parameter with no default value
+ * @param name The name of the config parameter
+ * @param type The type of the config
+ * @param validator A validator to use in checking the correctness of the config
+ * @param documentation The documentation string for the config
+ * @return This ConfigDef so you can chain calls
+ */
+ public ConfigDef define(String name, Type type, Validator validator, String documentation) {
+ return define(name, type, NO_DEFAULT_VALUE, validator, documentation);
+ }
+
+ /**
+ * Define a required parameter with no default value and no special validation logic
+ * @param name The name of the config parameter
+ * @param type The type of the config
+ * @param documentation The documentation string for the config
+ * @return This ConfigDef so you can chain calls
+ */
+ public ConfigDef define(String name, Type type, String documentation) {
+ return define(name, type, NO_DEFAULT_VALUE, null, documentation);
+ }
+
+ /**
+ * Parse and validate configs against this configuration definition. The input is a map of configs. It is expected
+ * that the keys of the map are strings, but the values can either be strings or they may already be of the
+ * appropriate type (int, string, etc). This will work equally well with either java.util.Properties instances or a
+ * programmatically constructed map.
+ * @param props The configs to parse and validate
+ * @return Parsed and validated configs. The key will be the config name and the value will be the value parsed into
+ * the appropriate type (int, string, etc)
+ */
+ public Map<String, Object> parse(Map<?, ?> props) {
+ /* parse all known keys */
+ Map<String, Object> values = new HashMap<String, Object>();
+ for (ConfigKey key : configKeys.values()) {
+ Object value;
+ if (props.containsKey(key.name))
+ value = parseType(key.name, props.get(key.name), key.type);
+ else if (key.defaultValue == NO_DEFAULT_VALUE)
+ throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value.");
+ else
+ value = key.defaultValue;
+ values.put(key.name, value);
+ }
+ return values;
+ }
+
+ /**
+ * Parse a value according to its expected type.
+ * @param name The config name
+ * @param value The config value
+ * @param type The expected type
+ * @return The parsed object
+ */
+ private Object parseType(String name, Object value, Type type) {
+ try {
+ String trimmed = null;
+ if (value instanceof String)
+ trimmed = ((String) value).trim();
+ switch (type) {
+ case BOOLEAN:
+ if (value instanceof String)
+ return Boolean.parseBoolean(trimmed);
+ else if (value instanceof Boolean)
+ return value;
+ else
+ throw new ConfigException(name, value, "Expected value to be either true or false");
+ case STRING:
+ if (value instanceof String)
+ return trimmed;
+ else
+ throw new ConfigException(name, value, "Expected value to be a string, but it was a " + value.getClass().getName());
+ case INT:
+ if (value instanceof Integer) {
+ return (Integer) value;
+ } else if (value instanceof String) {
+ return Integer.parseInt(trimmed);
+ } else {
+ throw new ConfigException(name, value, "Expected value to be an number.");
+ }
+ case LONG:
+ if (value instanceof Integer)
+ return ((Integer) value).longValue();
+ if (value instanceof Long)
+ return (Long) value;
+ else if (value instanceof String)
+ return Long.parseLong(trimmed);
+ else
+ throw new ConfigException(name, value, "Expected value to be an number.");
+ case DOUBLE:
+ if (value instanceof Number)
+ return ((Number) value).doubleValue();
+ else if (value instanceof String)
+ return Double.parseDouble(trimmed);
+ else
+ throw new ConfigException(name, value, "Expected value to be an number.");
+ case LIST:
+ if (value instanceof List)
+ return (List<?>) value;
+ else if (value instanceof String)
+ return Arrays.asList(trimmed.split("\\s*,\\s*", -1));
+ else
+ throw new ConfigException(name, value, "Expected a comma seperated list.");
+ case CLASS:
+ if (value instanceof Class)
+ return (Class<?>) value;
+ else if (value instanceof String)
+ return Class.forName(trimmed);
+ else
+ throw new ConfigException(name, value, "Expected a Class instance or class name.");
+ default:
+ throw new IllegalStateException("Unknown type.");
+ }
+ } catch (NumberFormatException e) {
+ throw new ConfigException(name, value, "Not a number of type " + type);
+ } catch (ClassNotFoundException e) {
+ throw new ConfigException(name, value, "Class " + value + " could not be found.");
+ }
+ }
+
+ /**
+ * The config types
+ */
+ public enum Type {
+ BOOLEAN, STRING, INT, LONG, DOUBLE, LIST, CLASS;
+ }
+
+ /**
+ * Validation logic the user may provide
+ */
+ public interface Validator {
+ public void ensureValid(String name, Object o);
+ }
+
+ /**
+ * Validation logic for numeric ranges
+ */
+ public static class Range implements Validator {
+ private final Number min;
+ private final Number max;
+
+ private Range(Number min, Number max) {
+ this.min = min;
+ this.max = max;
+ }
+
+ /**
+ * A numeric range that checks only the lower bound
+ * @param min The minimum acceptable value
+ */
+ public static Range atLeast(Number min) {
+ return new Range(min, Double.MAX_VALUE);
+ }
+
+ /**
+ * A numeric range that checks both the upper and lower bound
+ */
+ public static Range between(Number min, Number max) {
+ return new Range(min, max);
+ }
+
+ public void ensureValid(String name, Object o) {
+ Number n = (Number) o;
+ if (n.doubleValue() < min.doubleValue() || n.doubleValue() > max.doubleValue())
+ throw new ConfigException(name, o, "Value must be in the range [" + min + ", " + max + "]");
+ }
+ }
+
+ private static class ConfigKey {
+ public final String name;
+ public final Type type;
+ public final String documentation;
+ public final Object defaultValue;
+ public final Validator validator;
+
+ public ConfigKey(String name, Type type, Object defaultValue, Validator validator, String documentation) {
+ super();
+ this.name = name;
+ this.type = type;
+ this.defaultValue = defaultValue;
+ this.validator = validator;
+ if (this.validator != null)
+ this.validator.ensureValid(name, defaultValue);
+ this.documentation = documentation;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/config/ConfigException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigException.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigException.java
new file mode 100644
index 0000000..c2a59cf
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigException.java
@@ -0,0 +1,24 @@
+package org.apache.kafka.common.config;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * Thrown if the user supplies an invalid configuration
+ */
+public class ConfigException extends KafkaException {
+
+ private static final long serialVersionUID = 1L;
+
+ public ConfigException(String message) {
+ super(message);
+ }
+
+ public ConfigException(String name, Object value) {
+ this(name, value, null);
+ }
+
+ public ConfigException(String name, Object value, String message) {
+ super("Invalid value " + value + " for configuration " + name + (message == null ? "" : ": " + message));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java b/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java
new file mode 100644
index 0000000..46c4002
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java
@@ -0,0 +1,35 @@
+package org.apache.kafka.common.errors;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * Any API exception that is part of the public protocol and should be a subclass of this class and be part of this
+ * package.
+ */
+public abstract class ApiException extends KafkaException {
+
+ private static final long serialVersionUID = 1L;
+
+ public ApiException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ApiException(String message) {
+ super(message);
+ }
+
+ public ApiException(Throwable cause) {
+ super(cause);
+ }
+
+ public ApiException() {
+ super();
+ }
+
+ /* avoid the expensive and useless stack trace for api exceptions */
+ @Override
+ public Throwable fillInStackTrace() {
+ return this;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java b/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java
new file mode 100644
index 0000000..448f627
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java
@@ -0,0 +1,23 @@
+package org.apache.kafka.common.errors;
+
+public class CorruptRecordException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public CorruptRecordException() {
+ super("This message has failed it's CRC checksum or is otherwise corrupt.");
+ }
+
+ public CorruptRecordException(String message) {
+ super(message);
+ }
+
+ public CorruptRecordException(Throwable cause) {
+ super(cause);
+ }
+
+ public CorruptRecordException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java
new file mode 100644
index 0000000..571fdd7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java
@@ -0,0 +1,19 @@
+package org.apache.kafka.common.errors;
+
+public class LeaderNotAvailableException extends RetryableException {
+
+ private static final long serialVersionUID = 1L;
+
+ public LeaderNotAvailableException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public LeaderNotAvailableException(String message) {
+ super(message);
+ }
+
+ public LeaderNotAvailableException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/errors/NetworkException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NetworkException.java b/clients/src/main/java/org/apache/kafka/common/errors/NetworkException.java
new file mode 100644
index 0000000..14621e7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NetworkException.java
@@ -0,0 +1,23 @@
+package org.apache.kafka.common.errors;
+
+public class NetworkException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public NetworkException() {
+ super();
+ }
+
+ public NetworkException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public NetworkException(String message) {
+ super(message);
+ }
+
+ public NetworkException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java
new file mode 100644
index 0000000..2404afb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java
@@ -0,0 +1,23 @@
+package org.apache.kafka.common.errors;
+
+public class NotLeaderForPartitionException extends RetryableException {
+
+ private static final long serialVersionUID = 1L;
+
+ public NotLeaderForPartitionException() {
+ super();
+ }
+
+ public NotLeaderForPartitionException(String message) {
+ super(message);
+ }
+
+ public NotLeaderForPartitionException(Throwable cause) {
+ super(cause);
+ }
+
+ public NotLeaderForPartitionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java
new file mode 100644
index 0000000..e82dcc2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java
@@ -0,0 +1,22 @@
+package org.apache.kafka.common.errors;
+
+public class OffsetMetadataTooLarge extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public OffsetMetadataTooLarge() {
+ }
+
+ public OffsetMetadataTooLarge(String message) {
+ super(message);
+ }
+
+ public OffsetMetadataTooLarge(Throwable cause) {
+ super(cause);
+ }
+
+ public OffsetMetadataTooLarge(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
new file mode 100644
index 0000000..fa0a673
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
@@ -0,0 +1,22 @@
+package org.apache.kafka.common.errors;
+
+public class OffsetOutOfRangeException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public OffsetOutOfRangeException() {
+ }
+
+ public OffsetOutOfRangeException(String message) {
+ super(message);
+ }
+
+ public OffsetOutOfRangeException(Throwable cause) {
+ super(cause);
+ }
+
+ public OffsetOutOfRangeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java b/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
new file mode 100644
index 0000000..f06065e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
@@ -0,0 +1,23 @@
+package org.apache.kafka.common.errors;
+
+public class RecordTooLargeException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public RecordTooLargeException() {
+ super();
+ }
+
+ public RecordTooLargeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public RecordTooLargeException(String message) {
+ super(message);
+ }
+
+ public RecordTooLargeException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/errors/RetryableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RetryableException.java b/clients/src/main/java/org/apache/kafka/common/errors/RetryableException.java
new file mode 100644
index 0000000..53c5e8d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/RetryableException.java
@@ -0,0 +1,31 @@
+package org.apache.kafka.common.errors;
+
+/**
+ * A retryable exception is an exception that is safe to retry. To be retryable an exception should be
+ * <ol>
+ * <li>Transient, there is no point retrying a error due to a non-existant topic or message too large
+ * <li>Idempotent, the exception is known to not change any state on the server
+ * </ol>
+ * A client may choose to retry any request they like, but exceptions extending this class are always safe and sane to
+ * retry.
+ */
+public abstract class RetryableException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public RetryableException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public RetryableException(String message) {
+ super(message);
+ }
+
+ public RetryableException(Throwable cause) {
+ super(cause);
+ }
+
+ public RetryableException() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java b/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java
new file mode 100644
index 0000000..5e1bb67
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java
@@ -0,0 +1,23 @@
+package org.apache.kafka.common.errors;
+
+public class TimeoutException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public TimeoutException() {
+ super();
+ }
+
+ public TimeoutException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public TimeoutException(String message) {
+ super(message);
+ }
+
+ public TimeoutException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/errors/UnknownServerException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownServerException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownServerException.java
new file mode 100644
index 0000000..73fd755
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownServerException.java
@@ -0,0 +1,22 @@
+package org.apache.kafka.common.errors;
+
+public class UnknownServerException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public UnknownServerException() {
+ }
+
+ public UnknownServerException(String message) {
+ super(message);
+ }
+
+ public UnknownServerException(Throwable cause) {
+ super(cause);
+ }
+
+ public UnknownServerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java
new file mode 100644
index 0000000..2376bff
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java
@@ -0,0 +1,22 @@
+package org.apache.kafka.common.errors;
+
+public class UnknownTopicOrPartitionException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public UnknownTopicOrPartitionException() {
+ }
+
+ public UnknownTopicOrPartitionException(String message) {
+ super(message);
+ }
+
+ public UnknownTopicOrPartitionException(Throwable throwable) {
+ super(throwable);
+ }
+
+ public UnknownTopicOrPartitionException(String message, Throwable throwable) {
+ super(message, throwable);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/metrics/CompoundStat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/CompoundStat.java b/clients/src/main/java/org/apache/kafka/common/metrics/CompoundStat.java
new file mode 100644
index 0000000..a421f65
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/CompoundStat.java
@@ -0,0 +1,40 @@
+package org.apache.kafka.common.metrics;
+
+import java.util.List;
+
+/**
+ * A compound stat is a stat where a single measurement and associated data structure feeds many metrics. This is the
+ * example for a histogram which has many associated percentiles.
+ */
+public interface CompoundStat extends Stat {
+
+ public List<NamedMeasurable> stats();
+
+ public static class NamedMeasurable {
+
+ private final String name;
+ private final String description;
+ private final Measurable stat;
+
+ public NamedMeasurable(String name, String description, Measurable stat) {
+ super();
+ this.name = name;
+ this.description = description;
+ this.stat = stat;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public String description() {
+ return description;
+ }
+
+ public Measurable stat() {
+ return stat;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
new file mode 100644
index 0000000..57a6d98
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
@@ -0,0 +1,185 @@
+package org.apache.kafka.common.metrics;
+
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.AttributeNotFoundException;
+import javax.management.DynamicMBean;
+import javax.management.InvalidAttributeValueException;
+import javax.management.JMException;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
+import org.apache.kafka.common.KafkaException;
+
+
+/**
+ * Register metrics in JMX as dynamic mbeans based on the metric names
+ */
+public class JmxReporter implements MetricsReporter {
+
+ private final String prefix;
+ private final Map<String, KafkaMbean> mbeans = new HashMap<String, KafkaMbean>();
+
+ public JmxReporter() {
+ this("");
+ }
+
+ /**
+ * Create a JMX reporter that prefixes all metrics with the given string.
+ */
+ public JmxReporter(String prefix) {
+ this.prefix = prefix;
+ }
+
+ @Override
+ public synchronized void init(List<KafkaMetric> metrics) {
+ for (KafkaMetric metric : metrics)
+ addAttribute(metric);
+ for (KafkaMbean mbean : mbeans.values())
+ reregister(mbean);
+
+ }
+
+ @Override
+ public synchronized void metricChange(KafkaMetric metric) {
+ KafkaMbean mbean = addAttribute(metric);
+ reregister(mbean);
+ }
+
+ private KafkaMbean addAttribute(KafkaMetric metric) {
+ try {
+ String[] names = split(prefix + metric.name());
+ String qualifiedName = names[0] + "." + names[1];
+ if (!this.mbeans.containsKey(qualifiedName))
+ mbeans.put(qualifiedName, new KafkaMbean(names[0], names[1]));
+ KafkaMbean mbean = this.mbeans.get(qualifiedName);
+ mbean.setAttribute(names[2], metric);
+ return mbean;
+ } catch (JMException e) {
+ throw new KafkaException("Error creating mbean attribute " + metric.name(), e);
+ }
+ }
+
+ public synchronized void close() {
+ for (KafkaMbean mbean : this.mbeans.values())
+ unregister(mbean);
+
+ }
+
+ private void unregister(KafkaMbean mbean) {
+ MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+ try {
+ if (server.isRegistered(mbean.name()))
+ server.unregisterMBean(mbean.name());
+ } catch (JMException e) {
+ throw new KafkaException("Error unregistering mbean", e);
+ }
+ }
+
+ private void reregister(KafkaMbean mbean) {
+ unregister(mbean);
+ try {
+ ManagementFactory.getPlatformMBeanServer().registerMBean(mbean, mbean.name());
+ } catch (JMException e) {
+ throw new KafkaException("Error registering mbean " + mbean.name(), e);
+ }
+ }
+
+ private String[] split(String name) {
+ int attributeStart = name.lastIndexOf('.');
+ if (attributeStart < 0)
+ throw new IllegalArgumentException("No MBean name in metric name: " + name);
+ String attributeName = name.substring(attributeStart + 1, name.length());
+ String remainder = name.substring(0, attributeStart);
+ int beanStart = remainder.lastIndexOf('.');
+ if (beanStart < 0)
+ return new String[] { "", remainder, attributeName };
+ String packageName = remainder.substring(0, beanStart);
+ String beanName = remainder.substring(beanStart + 1, remainder.length());
+ return new String[] { packageName, beanName, attributeName };
+ }
+
+ private static class KafkaMbean implements DynamicMBean {
+ private final String beanName;
+ private final ObjectName objectName;
+ private final Map<String, KafkaMetric> metrics;
+
+ public KafkaMbean(String packageName, String beanName) throws MalformedObjectNameException {
+ this.beanName = beanName;
+ this.metrics = new HashMap<String, KafkaMetric>();
+ this.objectName = new ObjectName(packageName + ":type=" + beanName);
+ }
+
+ public ObjectName name() {
+ return objectName;
+ }
+
+ public void setAttribute(String name, KafkaMetric metric) {
+ this.metrics.put(name, metric);
+ }
+
+ @Override
+ public Object getAttribute(String name) throws AttributeNotFoundException, MBeanException, ReflectionException {
+ if (this.metrics.containsKey(name))
+ return this.metrics.get(name).value();
+ else
+ throw new AttributeNotFoundException("Could not find attribute " + name);
+ }
+
+ @Override
+ public AttributeList getAttributes(String[] names) {
+ try {
+ AttributeList list = new AttributeList();
+ for (String name : names)
+ list.add(new Attribute(name, getAttribute(name)));
+ return list;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return new AttributeList();
+ }
+ }
+
+ @Override
+ public MBeanInfo getMBeanInfo() {
+ MBeanAttributeInfo[] attrs = new MBeanAttributeInfo[metrics.size()];
+ int i = 0;
+ for (Map.Entry<String, KafkaMetric> entry : this.metrics.entrySet()) {
+ String attribute = entry.getKey();
+ KafkaMetric metric = entry.getValue();
+ attrs[i] = new MBeanAttributeInfo(attribute, double.class.getName(), metric.description(), true, false, false);
+ i += 1;
+ }
+ return new MBeanInfo(beanName, "", attrs, null, null, null);
+ }
+
+ @Override
+ public Object invoke(String name, Object[] params, String[] sig) throws MBeanException, ReflectionException {
+ throw new UnsupportedOperationException("Set not allowed.");
+ }
+
+ @Override
+ public void setAttribute(Attribute attribute) throws AttributeNotFoundException,
+ InvalidAttributeValueException,
+ MBeanException,
+ ReflectionException {
+ throw new UnsupportedOperationException("Set not allowed.");
+ }
+
+ @Override
+ public AttributeList setAttributes(AttributeList list) {
+ throw new UnsupportedOperationException("Set not allowed.");
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
new file mode 100644
index 0000000..260f23f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
@@ -0,0 +1,55 @@
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.utils.Time;
+
+public final class KafkaMetric implements Metric {
+
+ private final String name;
+ private final String description;
+ private final Object lock;
+ private final Time time;
+ private final Measurable measurable;
+ private MetricConfig config;
+
+ KafkaMetric(Object lock, String name, String description, Measurable measurable, MetricConfig config, Time time) {
+ super();
+ this.name = name;
+ this.description = description;
+ this.lock = lock;
+ this.measurable = measurable;
+ this.config = config;
+ this.time = time;
+ }
+
+ MetricConfig config() {
+ return this.config;
+ }
+
+ @Override
+ public String name() {
+ return this.name;
+ }
+
+ @Override
+ public String description() {
+ return this.description;
+ }
+
+ @Override
+ public double value() {
+ synchronized (this.lock) {
+ return value(time.nanoseconds());
+ }
+ }
+
+ double value(long time) {
+ return this.measurable.measure(config, time);
+ }
+
+ public void config(MetricConfig config) {
+ synchronized (lock) {
+ this.config = config;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java b/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java
new file mode 100644
index 0000000..9086405
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java
@@ -0,0 +1,16 @@
+package org.apache.kafka.common.metrics;
+
+/**
+ * A measurable quantity that can be registered as a metric
+ */
+public interface Measurable {
+
+ /**
+ * Measure this quantity and return the result as a double
+ * @param config The configuration for this metric
+ * @param now The time the measurement is being taken
+ * @return The measured value
+ */
+ public double measure(MetricConfig config, long now);
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/metrics/MeasurableStat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MeasurableStat.java b/clients/src/main/java/org/apache/kafka/common/metrics/MeasurableStat.java
new file mode 100644
index 0000000..314f726
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MeasurableStat.java
@@ -0,0 +1,10 @@
+package org.apache.kafka.common.metrics;
+
+/**
+ * A MeasurableStat is a {@link Stat} that is also {@link Measurable} (i.e. can produce a single floating point value).
+ * This is the interface used for most of the simple statistics such as {@link org.apache.kafka.common.metrics.stats.Avg},
+ * {@link org.apache.kafka.common.metrics.stats.Max}, {@link org.apache.kafka.common.metrics.stats.Count}, etc.
+ */
+public interface MeasurableStat extends Stat, Measurable {
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
new file mode 100644
index 0000000..f458584
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
@@ -0,0 +1,71 @@
+package org.apache.kafka.common.metrics;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Configuration values for metrics
+ */
+public class MetricConfig {
+
+ private Quota quota;
+ private int samples;
+ private long eventWindow;
+ private long timeWindowNs;
+ private TimeUnit unit;
+
+ public MetricConfig() {
+ super();
+ this.quota = null;
+ this.samples = 2;
+ this.eventWindow = Long.MAX_VALUE;
+ this.timeWindowNs = TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
+ this.unit = TimeUnit.SECONDS;
+ }
+
+ public Quota quota() {
+ return this.quota;
+ }
+
+ public MetricConfig quota(Quota quota) {
+ this.quota = quota;
+ return this;
+ }
+
+ public long eventWindow() {
+ return eventWindow;
+ }
+
+ public MetricConfig eventWindow(long window) {
+ this.eventWindow = window;
+ return this;
+ }
+
+ public long timeWindowNs() {
+ return timeWindowNs;
+ }
+
+ public MetricConfig timeWindow(long window, TimeUnit unit) {
+ this.timeWindowNs = TimeUnit.NANOSECONDS.convert(window, unit);
+ return this;
+ }
+
+ public int samples() {
+ return this.samples;
+ }
+
+ public MetricConfig samples(int samples) {
+ if (samples < 1)
+ throw new IllegalArgumentException("The number of samples must be at least 1.");
+ this.samples = samples;
+ return this;
+ }
+
+ public TimeUnit timeUnit() {
+ return unit;
+ }
+
+ public MetricConfig timeUnit(TimeUnit unit) {
+ this.unit = unit;
+ return this;
+ }
+}