You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/01/28 20:16:31 UTC

[5/7] KAFKA-1227 New producer!

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/Metrics.java b/clients/src/main/java/kafka/common/metrics/Metrics.java
new file mode 100644
index 0000000..f2cb782
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/Metrics.java
@@ -0,0 +1,190 @@
+package kafka.common.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import kafka.common.utils.SystemTime;
+import kafka.common.utils.Time;
+import kafka.common.utils.Utils;
+
+/**
+ * A registry of sensors and metrics.
+ * <p>
+ * A metric is a named, numerical measurement. A sensor is a handle to record numerical measurements as they occur. Each
+ * Sensor has zero or more associated metrics. For example a Sensor might represent message sizes and we might associate
+ * with this sensor a metric for the average, maximum, or other statistics computed off the sequence of message sizes
+ * that are recorded by the sensor.
+ * <p>
+ * Usage looks something like this:
+ * 
+ * <pre>
+ * // set up metrics:
+ * Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
+ * Sensor sensor = metrics.sensor(&quot;message-sizes&quot;);
+ * sensor.add(&quot;kafka.producer.message-sizes.avg&quot;, new Avg());
+ * sensor.add(&quot;kafka.producer.message-sizes.max&quot;, new Max());
+ * 
+ * // as messages are sent we record the sizes
+ * sensor.record(messageSize);
+ * </pre>
+ */
+public class Metrics {
+
+    private final MetricConfig config;
+    private final ConcurrentMap<String, KafkaMetric> metrics;
+    private final ConcurrentMap<String, Sensor> sensors;
+    private final List<MetricsReporter> reporters;
+    private final Time time;
+
+    /**
+     * Create a metrics repository with no metric reporters and default configuration.
+     */
+    public Metrics() {
+        this(new MetricConfig());
+    }
+
+    /**
+     * Create a metrics repository with no metric reporters and default configuration.
+     */
+    public Metrics(Time time) {
+        this(new MetricConfig(), new ArrayList<MetricsReporter>(), time);
+    }
+
+    /**
+     * Create a metrics repository with no reporters and the given default config. This config will be used for any
+     * metric that doesn't override its own config.
+     * @param defaultConfig The default config to use for all metrics that don't override their config
+     */
+    public Metrics(MetricConfig defaultConfig) {
+        this(defaultConfig, new ArrayList<MetricsReporter>(0), new SystemTime());
+    }
+
+    /**
+     * Create a metrics repository with a default config and the given metric reporters
+     * @param defaultConfig The default config
+     * @param reporters The metrics reporters
+     * @param time The time instance to use with the metrics
+     */
+    public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time) {
+        this.config = defaultConfig;
+        this.sensors = new ConcurrentHashMap<String, Sensor>();
+        this.metrics = new ConcurrentHashMap<String, KafkaMetric>();
+        this.reporters = Utils.notNull(reporters);
+        this.time = time;
+        for (MetricsReporter reporter : reporters)
+            reporter.init(new ArrayList<KafkaMetric>());
+    }
+
+    /**
+     * Create a sensor with the given unique name and zero or more parent sensors. All parent sensors will receive every
+     * value recorded with this sensor.
+     * @param name The name of the sensor
+     * @param parents The parent sensors
+     * @return The sensor that is created
+     */
+    public Sensor sensor(String name, Sensor... parents) {
+        return sensor(name, null, parents);
+    }
+
+    /**
+     * Create a sensor with the given unique name and zero or more parent sensors. All parent sensors will receive every
+     * value recorded with this sensor.
+     * @param name The name of the sensor
+     * @param config A default configuration to use for this sensor for metrics that don't have their own config
+     * @param parents The parent sensors
+     * @return The sensor that is created
+     */
+    public synchronized Sensor sensor(String name, MetricConfig config, Sensor... parents) {
+        Sensor s = this.sensors.get(Utils.notNull(name));
+        if (s == null) {
+            s = new Sensor(this, name, parents, config == null ? this.config : config, time);
+            this.sensors.put(name, s);
+        }
+        return s;
+    }
+
+    /**
+     * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
+     * This is a way to expose existing values as metrics.
+     * @param name The name of the metric
+     * @param measurable The measurable that will be measured by this metric
+     */
+    public void addMetric(String name, Measurable measurable) {
+        addMetric(name, "", measurable);
+    }
+
+    /**
+     * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
+     * This is a way to expose existing values as metrics.
+     * @param name The name of the metric
+     * @param description A human-readable description to include in the metric
+     * @param measurable The measurable that will be measured by this metric
+     */
+    public void addMetric(String name, String description, Measurable measurable) {
+        addMetric(name, description, null, measurable);
+    }
+
+    /**
+     * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
+     * This is a way to expose existing values as metrics.
+     * @param name The name of the metric
+     * @param config The configuration to use when measuring this measurable
+     * @param measurable The measurable that will be measured by this metric
+     */
+    public void addMetric(String name, MetricConfig config, Measurable measurable) {
+        addMetric(name, "", config, measurable);
+    }
+
+    /**
+     * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
+     * This is a way to expose existing values as metrics.
+     * @param name The name of the metric
+     * @param description A human-readable description to include in the metric
+     * @param config The configuration to use when measuring this measurable
+     * @param measurable The measurable that will be measured by this metric
+     */
+    public synchronized void addMetric(String name, String description, MetricConfig config, Measurable measurable) {
+        KafkaMetric m = new KafkaMetric(new Object(),
+                                        Utils.notNull(name),
+                                        Utils.notNull(description),
+                                        Utils.notNull(measurable),
+                                        config == null ? this.config : config,
+                                        time);
+        registerMetric(m);
+    }
+
+    /**
+     * Add a MetricReporter
+     */
+    public synchronized void addReporter(MetricsReporter reporter) {
+        Utils.notNull(reporter).init(new ArrayList<KafkaMetric>(metrics.values()));
+        this.reporters.add(reporter);
+    }
+
+    synchronized void registerMetric(KafkaMetric metric) {
+        if (this.metrics.containsKey(metric.name()))
+            throw new IllegalArgumentException("A metric named '" + metric.name() + "' already exists, can't register another one.");
+        this.metrics.put(metric.name(), metric);
+        for (MetricsReporter reporter : reporters)
+            reporter.metricChange(metric);
+    }
+
+    /**
+     * Get all the metrics currently maintained indexed by metric name
+     */
+    public Map<String, KafkaMetric> metrics() {
+        return this.metrics;
+    }
+
+    /**
+     * Close this metrics repository.
+     */
+    public void close() {
+        for (MetricsReporter reporter : this.reporters)
+            reporter.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/MetricsReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/MetricsReporter.java b/clients/src/main/java/kafka/common/metrics/MetricsReporter.java
new file mode 100644
index 0000000..bf0b39e
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/MetricsReporter.java
@@ -0,0 +1,27 @@
+package kafka.common.metrics;
+
+import java.util.List;
+
+/**
+ * A plugin interface to allow things to listen as new metrics are created so they can be reported
+ */
+public interface MetricsReporter {
+
+    /**
+     * This is called when the reporter is first registered to initially register all existing metrics
+     * @param metrics All currently existing metrics
+     */
+    public void init(List<KafkaMetric> metrics);
+
+    /**
+     * This is called whenever a metric is updated or added
+     * @param metric
+     */
+    public void metricChange(KafkaMetric metric);
+
+    /**
+     * Called when the metrics repository is closed.
+     */
+    public void close();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/Quota.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/Quota.java b/clients/src/main/java/kafka/common/metrics/Quota.java
new file mode 100644
index 0000000..6278246
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/Quota.java
@@ -0,0 +1,36 @@
+package kafka.common.metrics;
+
+/**
+ * An upper or lower bound for metrics
+ */
+public final class Quota {
+
+    private final boolean upper;
+    private final double bound;
+
+    public Quota(double bound, boolean upper) {
+        this.bound = bound;
+        this.upper = upper;
+    }
+
+    public static Quota lessThan(double upperBound) {
+        return new Quota(upperBound, true);
+    }
+
+    public static Quota moreThan(double lowerBound) {
+        return new Quota(lowerBound, false);
+    }
+
+    public boolean isUpperBound() {
+        return this.upper;
+    }
+
+    public double bound() {
+        return this.bound;
+    }
+
+    public boolean acceptable(double value) {
+        return (upper && value <= bound) || (!upper && value >= bound);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/QuotaViolationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/QuotaViolationException.java b/clients/src/main/java/kafka/common/metrics/QuotaViolationException.java
new file mode 100644
index 0000000..b9005cd
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/QuotaViolationException.java
@@ -0,0 +1,16 @@
+package kafka.common.metrics;
+
+import kafka.common.KafkaException;
+
+/**
+ * Thrown when a sensor records a value that causes a metric to go outside the bounds configured as its quota
+ */
+public class QuotaViolationException extends KafkaException {
+
+    private static final long serialVersionUID = 1L;
+
+    public QuotaViolationException(String m) {
+        super(m);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/Sensor.java b/clients/src/main/java/kafka/common/metrics/Sensor.java
new file mode 100644
index 0000000..9c11835
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/Sensor.java
@@ -0,0 +1,171 @@
+package kafka.common.metrics;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import kafka.common.metrics.CompoundStat.NamedMeasurable;
+import kafka.common.utils.Time;
+import kafka.common.utils.Utils;
+
+/**
+ * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on
+ * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set
+ * of metrics about request sizes such as the average or max.
+ */
+public final class Sensor {
+
+    private final Metrics registry;
+    private final String name;
+    private final Sensor[] parents;
+    private final List<Stat> stats;
+    private final List<KafkaMetric> metrics;
+    private final MetricConfig config;
+    private final Time time;
+
+    Sensor(Metrics registry, String name, Sensor[] parents, MetricConfig config, Time time) {
+        super();
+        this.registry = registry;
+        this.name = Utils.notNull(name);
+        this.parents = parents;
+        this.metrics = new ArrayList<KafkaMetric>();
+        this.stats = new ArrayList<Stat>();
+        this.config = config;
+        this.time = time;
+        checkForest(new HashSet<Sensor>());
+    }
+
+    /* Validate that this sensor doesn't end up referencing itself */
+    private void checkForest(Set<Sensor> sensors) {
+        if (!sensors.add(this))
+            throw new IllegalArgumentException("Circular dependency in sensors: " + name() + " is its own parent.");
+        for (int i = 0; i < parents.length; i++)
+            parents[i].checkForest(sensors);
+    }
+
+    /**
+     * The name this sensor is registered with. This name will be unique among all registered sensors.
+     */
+    public String name() {
+        return this.name;
+    }
+
+    /**
+     * Record an occurrence, this is just short-hand for {@link #record(double) record(1.0)}
+     */
+    public void record() {
+        record(1.0);
+    }
+
+    /**
+     * Record a value with this sensor
+     * @param value The value to record
+     * @throws QuotaViolationException if recording this value moves a metric beyond its configured maximum or minimum
+     *         bound
+     */
+    public void record(double value) {
+        record(value, time.nanoseconds());
+    }
+
+    private void record(double value, long time) {
+        synchronized (this) {
+            // increment all the stats
+            for (int i = 0; i < this.stats.size(); i++)
+                this.stats.get(i).record(config, value, time);
+            checkQuotas(time);
+
+        }
+        for (int i = 0; i < parents.length; i++)
+            parents[i].record(value, time);
+    }
+
+    private void checkQuotas(long time) {
+        for (int i = 0; i < this.metrics.size(); i++) {
+            KafkaMetric metric = this.metrics.get(i);
+            MetricConfig config = metric.config();
+            if (config != null) {
+                Quota quota = config.quota();
+                if (quota != null)
+                    if (!quota.acceptable(metric.value(time)))
+                        throw new QuotaViolationException("Metric " + metric.name() + " is in violation of its quota of " + quota.bound());
+            }
+        }
+    }
+
+    /**
+     * Register a compound statistic with this sensor with no config override
+     */
+    public void add(CompoundStat stat) {
+        add(stat, null);
+    }
+
+    /**
+     * Register a compound statistic with this sensor which yields multiple measurable quantities (like a histogram)
+     * @param stat The stat to register
+     * @param config The configuration for this stat. If null then the stat will use the default configuration for this
+     *        sensor.
+     */
+    public synchronized void add(CompoundStat stat, MetricConfig config) {
+        this.stats.add(Utils.notNull(stat));
+        for (NamedMeasurable m : stat.stats()) {
+            KafkaMetric metric = new KafkaMetric(this, m.name(), m.description(), m.stat(), config == null ? this.config : config, time);
+            this.registry.registerMetric(metric);
+            this.metrics.add(metric);
+        }
+    }
+
+    /**
+     * Add a metric with default configuration and no description. Equivalent to
+     * {@link Sensor#add(String, String, MeasurableStat, MetricConfig) add(name, "", stat, null)}
+     * 
+     */
+    public void add(String name, MeasurableStat stat) {
+        add(name, stat, null);
+    }
+
+    /**
+     * Add a metric with default configuration. Equivalent to
+     * {@link Sensor#add(String, String, MeasurableStat, MetricConfig) add(name, description, stat, null)}
+     * 
+     */
+    public void add(String name, String description, MeasurableStat stat) {
+        add(name, description, stat, null);
+    }
+
+    /**
+     * Add a metric to this sensor with no description. Equivalent to
+     * {@link Sensor#add(String, String, MeasurableStat, MetricConfig) add(name, "", stat, config)}
+     * @param name
+     * @param stat
+     * @param config
+     */
+    public void add(String name, MeasurableStat stat, MetricConfig config) {
+        add(name, "", stat, config);
+    }
+
+    /**
+     * Register a metric with this sensor
+     * @param name The name of the metric
+     * @param description A description used when reporting the value
+     * @param stat The statistic to keep
+     * @param config A special configuration for this metric. If null use the sensor default configuration.
+     */
+    public synchronized void add(String name, String description, MeasurableStat stat, MetricConfig config) {
+        KafkaMetric metric = new KafkaMetric(this,
+                                             Utils.notNull(name),
+                                             Utils.notNull(description),
+                                             Utils.notNull(stat),
+                                             config == null ? this.config : config,
+                                             time);
+        this.registry.registerMetric(metric);
+        this.metrics.add(metric);
+        this.stats.add(stat);
+    }
+
+    synchronized List<KafkaMetric> metrics() {
+        return Collections.unmodifiableList(this.metrics);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/Stat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/Stat.java b/clients/src/main/java/kafka/common/metrics/Stat.java
new file mode 100644
index 0000000..8844545
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/Stat.java
@@ -0,0 +1,16 @@
+package kafka.common.metrics;
+
+/**
+ * A Stat is a quanity such as average, max, etc that is computed off the stream of updates to a sensor
+ */
+public interface Stat {
+
+    /**
+     * Record the given value
+     * @param config The configuration to use for this metric
+     * @param value The value to record
+     * @param time The time this value occurred
+     */
+    public void record(MetricConfig config, double value, long time);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/stats/Avg.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Avg.java b/clients/src/main/java/kafka/common/metrics/stats/Avg.java
new file mode 100644
index 0000000..b9d3d5d
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/stats/Avg.java
@@ -0,0 +1,33 @@
+package kafka.common.metrics.stats;
+
+import java.util.List;
+
+import kafka.common.metrics.MetricConfig;
+
+/**
+ * A {@link SampledStat} that maintains a simple average over its samples.
+ */
+public class Avg extends SampledStat {
+
+    public Avg() {
+        super(0.0);
+    }
+
+    @Override
+    protected void update(Sample sample, MetricConfig config, double value, long now) {
+        sample.value += value;
+    }
+
+    @Override
+    public double combine(List<Sample> samples, MetricConfig config, long now) {
+        double total = 0.0;
+        long count = 0;
+        for (int i = 0; i < samples.size(); i++) {
+            Sample s = samples.get(i);
+            total += s.value;
+            count += s.eventCount;
+        }
+        return total / count;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/stats/Count.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Count.java b/clients/src/main/java/kafka/common/metrics/stats/Count.java
new file mode 100644
index 0000000..3712e78
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/stats/Count.java
@@ -0,0 +1,29 @@
+package kafka.common.metrics.stats;
+
+import java.util.List;
+
+import kafka.common.metrics.MetricConfig;
+
+/**
+ * A {@link SampledStat} that maintains a simple count of what it has seen.
+ */
+public class Count extends SampledStat {
+
+    public Count() {
+        super(0);
+    }
+
+    @Override
+    protected void update(Sample sample, MetricConfig config, double value, long now) {
+        sample.value += 1.0;
+    }
+
+    @Override
+    public double combine(List<Sample> samples, MetricConfig config, long now) {
+        double total = 0.0;
+        for (int i = 0; i < samples.size(); i++)
+            total += samples.get(i).value;
+        return total;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/stats/Histogram.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Histogram.java b/clients/src/main/java/kafka/common/metrics/stats/Histogram.java
new file mode 100644
index 0000000..c59b585
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/stats/Histogram.java
@@ -0,0 +1,137 @@
+package kafka.common.metrics.stats;
+
+public class Histogram {
+
+    private final BinScheme binScheme;
+    private final float[] hist;
+    private double count;
+
+    public Histogram(BinScheme binScheme) {
+        this.hist = new float[binScheme.bins()];
+        this.count = 0.0f;
+        this.binScheme = binScheme;
+    }
+
+    public void record(double value) {
+        this.hist[binScheme.toBin(value)] += 1.0f;
+        this.count += 1.0f;
+    }
+
+    public double value(double quantile) {
+        if (count == 0L)
+            return Double.NaN;
+        float sum = 0.0f;
+        float quant = (float) quantile;
+        for (int i = 0; i < this.hist.length - 1; i++) {
+            sum += this.hist[i];
+            if (sum / count > quant)
+                return binScheme.fromBin(i);
+        }
+        return Float.POSITIVE_INFINITY;
+    }
+
+    public void clear() {
+        for (int i = 0; i < this.hist.length; i++)
+            this.hist[i] = 0.0f;
+        this.count = 0;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder b = new StringBuilder('{');
+        for (int i = 0; i < this.hist.length - 1; i++) {
+            b.append(String.format("%.10f", binScheme.fromBin(i)));
+            b.append(':');
+            b.append(String.format("%.0f", this.hist[i]));
+            b.append(',');
+        }
+        b.append(Float.POSITIVE_INFINITY);
+        b.append(':');
+        b.append(this.hist[this.hist.length - 1]);
+        b.append('}');
+        return b.toString();
+    }
+
+    public interface BinScheme {
+        public int bins();
+
+        public int toBin(double value);
+
+        public double fromBin(int bin);
+    }
+
+    public static class ConstantBinScheme implements BinScheme {
+        private final double min;
+        private final double max;
+        private final int bins;
+        private final double bucketWidth;
+
+        public ConstantBinScheme(int bins, double min, double max) {
+            if (bins < 2)
+                throw new IllegalArgumentException("Must have at least 2 bins.");
+            this.min = min;
+            this.max = max;
+            this.bins = bins;
+            this.bucketWidth = (max - min) / (bins - 2);
+        }
+
+        public int bins() {
+            return this.bins;
+        }
+
+        public double fromBin(int b) {
+            if (b == 0)
+                return Double.NEGATIVE_INFINITY;
+            else if (b == bins - 1)
+                return Double.POSITIVE_INFINITY;
+            else
+                return min + (b - 1) * bucketWidth;
+        }
+
+        public int toBin(double x) {
+            if (x < min)
+                return 0;
+            else if (x > max)
+                return bins - 1;
+            else
+                return (int) ((x - min) / bucketWidth) + 1;
+        }
+    }
+
+    public static class LinearBinScheme implements BinScheme {
+        private final int bins;
+        private final double max;
+        private final double scale;
+
+        public LinearBinScheme(int numBins, double max) {
+            this.bins = numBins;
+            this.max = max;
+            this.scale = max / (numBins * (numBins - 1) / 2);
+        }
+
+        public int bins() {
+            return this.bins;
+        }
+
+        public double fromBin(int b) {
+            if (b == this.bins - 1) {
+                return Float.POSITIVE_INFINITY;
+            } else {
+                double unscaled = (b * (b - 1.0)) / 2.0;
+                return unscaled * this.scale;
+            }
+        }
+
+        public int toBin(double x) {
+            if (x < 0.0d) {
+                throw new IllegalArgumentException("Values less than 0.0 not accepted.");
+            } else if (x > this.max) {
+                return this.bins - 1;
+            } else {
+                double scaled = x / this.scale;
+                return (int) (-0.5 + Math.sqrt(2.0 * scaled + 0.25));
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/stats/Max.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Max.java b/clients/src/main/java/kafka/common/metrics/stats/Max.java
new file mode 100644
index 0000000..e7bd1d2
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/stats/Max.java
@@ -0,0 +1,29 @@
+package kafka.common.metrics.stats;
+
+import java.util.List;
+
+import kafka.common.metrics.MetricConfig;
+
+/**
+ * A {@link SampledStat} that gives the max over its samples.
+ */
+public final class Max extends SampledStat {
+
+    public Max() {
+        super(Double.NEGATIVE_INFINITY);
+    }
+
+    @Override
+    protected void update(Sample sample, MetricConfig config, double value, long now) {
+        sample.value = Math.max(sample.value, value);
+    }
+
+    @Override
+    public double combine(List<Sample> samples, MetricConfig config, long now) {
+        double max = Double.NEGATIVE_INFINITY;
+        for (int i = 0; i < samples.size(); i++)
+            max = Math.max(max, samples.get(i).value);
+        return max;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/stats/Min.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Min.java b/clients/src/main/java/kafka/common/metrics/stats/Min.java
new file mode 100644
index 0000000..db0ab92
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/stats/Min.java
@@ -0,0 +1,29 @@
+package kafka.common.metrics.stats;
+
+import java.util.List;
+
+import kafka.common.metrics.MetricConfig;
+
+/**
+ * A {@link SampledStat} that gives the min over its samples.
+ */
+public class Min extends SampledStat {
+
+    public Min() {
+        super(Double.MIN_VALUE);
+    }
+
+    @Override
+    protected void update(Sample sample, MetricConfig config, double value, long now) {
+        sample.value = Math.min(sample.value, value);
+    }
+
+    @Override
+    public double combine(List<Sample> samples, MetricConfig config, long now) {
+        double max = Double.MAX_VALUE;
+        for (int i = 0; i < samples.size(); i++)
+            max = Math.min(max, samples.get(i).value);
+        return max;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/stats/Percentile.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Percentile.java b/clients/src/main/java/kafka/common/metrics/stats/Percentile.java
new file mode 100644
index 0000000..84320bb
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/stats/Percentile.java
@@ -0,0 +1,32 @@
+package kafka.common.metrics.stats;
+
+public class Percentile {
+
+    private final String name;
+    private final String description;
+    private final double percentile;
+
+    public Percentile(String name, double percentile) {
+        this(name, "", percentile);
+    }
+
+    public Percentile(String name, String description, double percentile) {
+        super();
+        this.name = name;
+        this.description = description;
+        this.percentile = percentile;
+    }
+
+    public String name() {
+        return this.name;
+    }
+
+    public String description() {
+        return this.description;
+    }
+
+    public double percentile() {
+        return this.percentile;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java b/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java
new file mode 100644
index 0000000..686c726
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java
@@ -0,0 +1,76 @@
+package kafka.common.metrics.stats;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import kafka.common.metrics.CompoundStat;
+import kafka.common.metrics.Measurable;
+import kafka.common.metrics.MetricConfig;
+import kafka.common.metrics.stats.Histogram.BinScheme;
+import kafka.common.metrics.stats.Histogram.ConstantBinScheme;
+import kafka.common.metrics.stats.Histogram.LinearBinScheme;
+
+/**
+ * A compound stat that reports one or more percentiles
+ */
+public class Percentiles implements CompoundStat {
+
+    public static enum BucketSizing {
+        CONSTANT, LINEAR
+    }
+
+    private final Percentile[] percentiles;
+    private Histogram current;
+    private Histogram shadow;
+    private long lastWindow;
+    private long eventCount;
+
+    public Percentiles(int sizeInBytes, double max, BucketSizing bucketing, Percentile... percentiles) {
+        this(sizeInBytes, 0.0, max, bucketing, percentiles);
+    }
+
+    public Percentiles(int sizeInBytes, double min, double max, BucketSizing bucketing, Percentile... percentiles) {
+        this.percentiles = percentiles;
+        BinScheme scheme = null;
+        if (bucketing == BucketSizing.CONSTANT) {
+            scheme = new ConstantBinScheme(sizeInBytes / 4, min, max);
+        } else if (bucketing == BucketSizing.LINEAR) {
+            if (min != 0.0d)
+                throw new IllegalArgumentException("Linear bucket sizing requires min to be 0.0.");
+            scheme = new LinearBinScheme(sizeInBytes / 4, max);
+        }
+        this.current = new Histogram(scheme);
+        this.shadow = new Histogram(scheme);
+        this.eventCount = 0L;
+    }
+
+    @Override
+    public List<NamedMeasurable> stats() {
+        List<NamedMeasurable> ms = new ArrayList<NamedMeasurable>(this.percentiles.length);
+        for (Percentile percentile : this.percentiles) {
+            final double pct = percentile.percentile();
+            ms.add(new NamedMeasurable(percentile.name(), percentile.description(), new Measurable() {
+                public double measure(MetricConfig config, long now) {
+                    return current.value(pct / 100.0);
+                }
+            }));
+        }
+        return ms;
+    }
+
+    @Override
+    public void record(MetricConfig config, double value, long time) {
+        long ellapsed = time - this.lastWindow;
+        if (ellapsed > config.timeWindowNs() / 2 || this.eventCount > config.eventWindow() / 2)
+            this.shadow.clear();
+        if (ellapsed > config.timeWindowNs() || this.eventCount > config.eventWindow()) {
+            Histogram tmp = this.current;
+            this.current = this.shadow;
+            this.shadow = tmp;
+            this.shadow.clear();
+        }
+        this.current.record(value);
+        this.shadow.record(value);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/stats/Rate.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/kafka/common/metrics/stats/Rate.java
new file mode 100644
index 0000000..3f24a92
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/stats/Rate.java
@@ -0,0 +1,85 @@
+package kafka.common.metrics.stats;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import kafka.common.metrics.MeasurableStat;
+import kafka.common.metrics.MetricConfig;
+
+/**
+ * The rate of the given quanitity. By default this is the total observed over a set of samples from a sampled statistic
+ * divided by the ellapsed time over the sample windows. Alternative {@link SampledStat} implementations can be
+ * provided, however, to record the rate of occurences (e.g. the count of values measured over the time interval) or
+ * other such values.
+ */
+public class Rate implements MeasurableStat {
+
+    private final TimeUnit unit;
+    private final SampledStat stat;
+
+    public Rate(TimeUnit unit) {
+        this(unit, new SampledTotal());
+    }
+
+    public Rate(TimeUnit unit, SampledStat stat) {
+        this.stat = stat;
+        this.unit = unit;
+    }
+
+    public String unitName() {
+        return unit.name().substring(0, unit.name().length() - 2).toLowerCase();
+    }
+
+    @Override
+    public void record(MetricConfig config, double value, long time) {
+        this.stat.record(config, value, time);
+    }
+
+    @Override
+    public double measure(MetricConfig config, long now) {
+        double ellapsed = convert(now - stat.oldest().lastWindow);
+        return stat.measure(config, now) / ellapsed;
+    }
+
+    private double convert(long time) {
+        switch (unit) {
+            case NANOSECONDS:
+                return time;
+            case MICROSECONDS:
+                return time / 1000.0;
+            case MILLISECONDS:
+                return time / (1000.0 * 1000.0);
+            case SECONDS:
+                return time / (1000.0 * 1000.0 * 1000.0);
+            case MINUTES:
+                return time / (60.0 * 1000.0 * 1000.0 * 1000.0);
+            case HOURS:
+                return time / (60.0 * 60.0 * 1000.0 * 1000.0 * 1000.0);
+            case DAYS:
+                return time / (24.0 * 60.0 * 60.0 * 1000.0 * 1000.0 * 1000.0);
+            default:
+                throw new IllegalStateException("Unknown unit: " + unit);
+        }
+    }
+
+    public static class SampledTotal extends SampledStat {
+
+        public SampledTotal() {
+            super(0.0d);
+        }
+
+        @Override
+        protected void update(Sample sample, MetricConfig config, double value, long now) {
+            sample.value += value;
+        }
+
+        @Override
+        public double combine(List<Sample> samples, MetricConfig config, long now) {
+            double total = 0.0;
+            for (int i = 0; i < samples.size(); i++)
+                total += samples.get(i).value;
+            return total;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java b/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java
new file mode 100644
index 0000000..6f820fa
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java
@@ -0,0 +1,106 @@
+package kafka.common.metrics.stats;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import kafka.common.metrics.MeasurableStat;
+import kafka.common.metrics.MetricConfig;
+
+/**
+ * A SampledStat records a single scalar value measured over one or more samples. Each sample is recorded over a
+ * configurable window. The window can be defined by number of events or ellapsed time (or both, if both are given the
+ * window is complete when <i>either</i> the event count or ellapsed time criterion is met).
+ * <p>
+ * All the samples are combined to produce the measurement. When a window is complete the oldest sample is cleared and
+ * recycled to begin recording the next sample.
+ * 
+ * Subclasses of this class define different statistics measured using this basic pattern.
+ */
+public abstract class SampledStat implements MeasurableStat {
+
+    private double initialValue;
+    private int current = 0;
+    private List<Sample> samples;
+
+    public SampledStat(double initialValue) {
+        this.initialValue = initialValue;
+        this.samples = new ArrayList<Sample>(2);
+    }
+
+    @Override
+    public void record(MetricConfig config, double value, long now) {
+        Sample sample = current(now);
+        if (sample.isComplete(now, config))
+            sample = advance(config, now);
+        update(sample, config, value, now);
+        sample.eventCount += 1;
+    }
+
+    private Sample advance(MetricConfig config, long now) {
+        this.current = (this.current + 1) % config.samples();
+        if (this.current >= samples.size()) {
+            Sample sample = new Sample(this.initialValue, now);
+            this.samples.add(sample);
+            return sample;
+        } else {
+            Sample sample = current(now);
+            sample.reset(now);
+            return sample;
+        }
+    }
+
+    @Override
+    public double measure(MetricConfig config, long now) {
+        timeoutObsoleteSamples(config, now);
+        return combine(this.samples, config, now);
+    }
+
+    public Sample current(long now) {
+        if (samples.size() == 0)
+            this.samples.add(new Sample(initialValue, now));
+        return this.samples.get(this.current);
+    }
+
+    public Sample oldest() {
+        return this.samples.get((this.current + 1) % this.samples.size());
+    }
+
+    protected abstract void update(Sample sample, MetricConfig config, double value, long now);
+
+    public abstract double combine(List<Sample> samples, MetricConfig config, long now);
+
+    /* Timeout any windows that have expired in the absense of any events */
+    private void timeoutObsoleteSamples(MetricConfig config, long now) {
+        for (int i = 0; i < samples.size(); i++) {
+            int idx = (this.current + i) % samples.size();
+            Sample sample = this.samples.get(idx);
+            if (now - sample.lastWindow >= (i + 1) * config.timeWindowNs())
+                sample.reset(now);
+        }
+    }
+
+    protected static class Sample {
+        public double initialValue;
+        public long eventCount;
+        public long lastWindow;
+        public double value;
+
+        public Sample(double initialValue, long now) {
+            this.initialValue = initialValue;
+            this.eventCount = 0;
+            this.lastWindow = now;
+            this.value = initialValue;
+        }
+
+        public void reset(long now) {
+            this.eventCount = 0;
+            this.lastWindow = now;
+            this.value = initialValue;
+        }
+
+        public boolean isComplete(long now, MetricConfig config) {
+            return now - lastWindow >= config.timeWindowNs() || eventCount >= config.eventWindow();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/stats/Total.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Total.java b/clients/src/main/java/kafka/common/metrics/stats/Total.java
new file mode 100644
index 0000000..c87b1ba
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/stats/Total.java
@@ -0,0 +1,31 @@
+package kafka.common.metrics.stats;
+
+import kafka.common.metrics.MeasurableStat;
+import kafka.common.metrics.MetricConfig;
+
+/**
+ * An un-windowed cumulative total maintained over all time.
+ */
+public class Total implements MeasurableStat {
+
+    private double total;
+
+    public Total() {
+        this.total = 0.0;
+    }
+
+    public Total(double value) {
+        this.total = value;
+    }
+
+    @Override
+    public void record(MetricConfig config, double value, long time) {
+        this.total += value;
+    }
+
+    @Override
+    public double measure(MetricConfig config, long now) {
+        return this.total;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/network/ByteBufferReceive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/ByteBufferReceive.java b/clients/src/main/java/kafka/common/network/ByteBufferReceive.java
new file mode 100644
index 0000000..cb1aaae
--- /dev/null
+++ b/clients/src/main/java/kafka/common/network/ByteBufferReceive.java
@@ -0,0 +1,43 @@
+package kafka.common.network;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ScatteringByteChannel;
+
+/**
+ * A receive backed by an array of ByteBuffers
+ */
+public class ByteBufferReceive implements Receive {
+
+    private final int source;
+    private final ByteBuffer[] buffers;
+    private int remaining;
+
+    public ByteBufferReceive(int source, ByteBuffer... buffers) {
+        super();
+        this.source = source;
+        this.buffers = buffers;
+        for (int i = 0; i < buffers.length; i++)
+            remaining += buffers[i].remaining();
+    }
+
+    @Override
+    public int source() {
+        return source;
+    }
+
+    @Override
+    public boolean complete() {
+        return remaining > 0;
+    }
+
+    @Override
+    public long readFrom(ScatteringByteChannel channel) throws IOException {
+        return channel.read(buffers);
+    }
+
+    public ByteBuffer[] reify() {
+        return buffers;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/network/ByteBufferSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/ByteBufferSend.java b/clients/src/main/java/kafka/common/network/ByteBufferSend.java
new file mode 100644
index 0000000..43bf963
--- /dev/null
+++ b/clients/src/main/java/kafka/common/network/ByteBufferSend.java
@@ -0,0 +1,54 @@
+package kafka.common.network;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+
+/**
+ * A send backed by an array of byte buffers
+ */
+public class ByteBufferSend implements Send {
+
+    private final int destination;
+    protected final ByteBuffer[] buffers;
+    private int remaining;
+
+    public ByteBufferSend(int destination, ByteBuffer... buffers) {
+        super();
+        this.destination = destination;
+        this.buffers = buffers;
+        for (int i = 0; i < buffers.length; i++)
+            remaining += buffers[i].remaining();
+    }
+
+    @Override
+    public int destination() {
+        return destination;
+    }
+
+    @Override
+    public boolean complete() {
+        return remaining > 0;
+    }
+
+    @Override
+    public ByteBuffer[] reify() {
+        return this.buffers;
+    }
+
+    @Override
+    public int remaining() {
+        return this.remaining;
+    }
+
+    @Override
+    public long writeTo(GatheringByteChannel channel) throws IOException {
+        long written = channel.write(buffers);
+        if (written < 0)
+            throw new EOFException("This shouldn't happen.");
+        remaining -= written;
+        return written;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/network/NetworkReceive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/NetworkReceive.java b/clients/src/main/java/kafka/common/network/NetworkReceive.java
new file mode 100644
index 0000000..68ae48e
--- /dev/null
+++ b/clients/src/main/java/kafka/common/network/NetworkReceive.java
@@ -0,0 +1,74 @@
+package kafka.common.network;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ScatteringByteChannel;
+
+/**
+ * A size delimited Receive that consists of a 4 byte network-ordered size N followed by N bytes of content
+ */
+public class NetworkReceive implements Receive {
+
+    private final int source;
+    private final ByteBuffer size;
+    private ByteBuffer buffer;
+
+    public NetworkReceive(int source, ByteBuffer buffer) {
+        this.source = source;
+        this.buffer = buffer;
+        this.size = null;
+    }
+
+    public NetworkReceive(int source) {
+        this.source = source;
+        this.size = ByteBuffer.allocate(4);
+        this.buffer = null;
+    }
+
+    @Override
+    public int source() {
+        return source;
+    }
+
+    @Override
+    public boolean complete() {
+        return !size.hasRemaining() && !buffer.hasRemaining();
+    }
+
+    @Override
+    public ByteBuffer[] reify() {
+        return new ByteBuffer[] { this.buffer };
+    }
+
+    @Override
+    public long readFrom(ScatteringByteChannel channel) throws IOException {
+        int read = 0;
+        if (size.hasRemaining()) {
+            int bytesRead = channel.read(size);
+            if (bytesRead < 0)
+                throw new EOFException();
+            read += bytesRead;
+            if (!size.hasRemaining()) {
+                size.rewind();
+                int requestSize = size.getInt();
+                if (requestSize < 0)
+                    throw new IllegalStateException("Invalid request (size = " + requestSize + ")");
+                this.buffer = ByteBuffer.allocate(requestSize);
+            }
+        }
+        if (buffer != null) {
+            int bytesRead = channel.read(buffer);
+            if (bytesRead < 0)
+                throw new EOFException();
+            read += bytesRead;
+        }
+
+        return read;
+    }
+
+    public ByteBuffer payload() {
+        return this.buffer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/network/NetworkSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/NetworkSend.java b/clients/src/main/java/kafka/common/network/NetworkSend.java
new file mode 100644
index 0000000..4e4ac98
--- /dev/null
+++ b/clients/src/main/java/kafka/common/network/NetworkSend.java
@@ -0,0 +1,26 @@
+package kafka.common.network;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A size delimited Send that consists of a 4 byte network-ordered size N followed by N bytes of content
+ */
+public class NetworkSend extends ByteBufferSend {
+
+    public NetworkSend(int destination, ByteBuffer... buffers) {
+        super(destination, sizeDelimit(buffers));
+    }
+
+    private static ByteBuffer[] sizeDelimit(ByteBuffer[] buffers) {
+        int size = 0;
+        for (int i = 0; i < buffers.length; i++)
+            size += buffers[i].remaining();
+        ByteBuffer[] delimited = new ByteBuffer[buffers.length + 1];
+        delimited[0] = ByteBuffer.allocate(4);
+        delimited[0].putInt(size);
+        delimited[0].rewind();
+        System.arraycopy(buffers, 0, delimited, 1, buffers.length);
+        return delimited;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/network/Receive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/Receive.java b/clients/src/main/java/kafka/common/network/Receive.java
new file mode 100644
index 0000000..40ee942
--- /dev/null
+++ b/clients/src/main/java/kafka/common/network/Receive.java
@@ -0,0 +1,35 @@
+package kafka.common.network;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ScatteringByteChannel;
+
+/**
+ * This interface models the in-progress reading of data from a channel to a source identified by an integer id
+ */
+public interface Receive {
+
+    /**
+     * The numeric id of the source from which we are receiving data.
+     */
+    public int source();
+
+    /**
+     * Are we done receiving data?
+     */
+    public boolean complete();
+
+    /**
+     * Turn this receive into ByteBuffer instances, if possible (otherwise returns null).
+     */
+    public ByteBuffer[] reify();
+
+    /**
+     * Read bytes into this receive from the given channel
+     * @param channel The channel to read from
+     * @return The number of bytes read
+     * @throws IOException If the reading fails
+     */
+    public long readFrom(ScatteringByteChannel channel) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/network/Selectable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/Selectable.java b/clients/src/main/java/kafka/common/network/Selectable.java
new file mode 100644
index 0000000..794fc60
--- /dev/null
+++ b/clients/src/main/java/kafka/common/network/Selectable.java
@@ -0,0 +1,68 @@
+package kafka.common.network;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+/**
+ * An interface for asynchronous, multi-channel network I/O
+ */
+public interface Selectable {
+
+    /**
+     * Begin establishing a socket connection to the given address identified by the given address
+     * @param id The id for this connection
+     * @param address The address to connect to
+     * @param sendBufferSize The send buffer for the socket
+     * @param receiveBufferSize The receive buffer for the socket
+     * @throws IOException If we cannot begin connecting
+     */
+    public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException;
+
+    /**
+     * Begin disconnecting the connection identified by the given id
+     */
+    public void disconnect(int id);
+
+    /**
+     * Wakeup this selector if it is blocked on I/O
+     */
+    public void wakeup();
+
+    /**
+     * Close this selector
+     */
+    public void close();
+
+    /**
+     * Initiate any sends provided, and make progress on any other I/O operations in-flight (connections,
+     * disconnections, existing sends, and receives)
+     * @param timeout The amount of time to block if there is nothing to do
+     * @param sends The new sends to initiate
+     * @throws IOException
+     */
+    public void poll(long timeout, List<NetworkSend> sends) throws IOException;
+
+    /**
+     * The list of sends that completed on the last {@link #poll(long, List<NetworkSend>) poll()} call.
+     */
+    public List<NetworkSend> completedSends();
+
+    /**
+     * The list of receives that completed on the last {@link #poll(long, List<NetworkSend>) poll()} call.
+     */
+    public List<NetworkReceive> completedReceives();
+
+    /**
+     * The list of connections that finished disconnecting on the last {@link #poll(long, List<NetworkSend>) poll()}
+     * call.
+     */
+    public List<Integer> disconnected();
+
+    /**
+     * The list of connections that completed their connection on the last {@link #poll(long, List<NetworkSend>) poll()}
+     * call.
+     */
+    public List<Integer> connected();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/Selector.java b/clients/src/main/java/kafka/common/network/Selector.java
new file mode 100644
index 0000000..f53060c
--- /dev/null
+++ b/clients/src/main/java/kafka/common/network/Selector.java
@@ -0,0 +1,349 @@
+package kafka.common.network;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.UnresolvedAddressException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import kafka.common.KafkaException;
+
+/**
+ * A selector interface for doing non-blocking multi-connection network I/O.
+ * <p>
+ * This class works with {@link NetworkSend} and {@link NetworkReceive} to transmit size-delimited network requests and
+ * responses.
+ * <p>
+ * A connection can be added to the selector associated with an integer id by doing
+ * 
+ * <pre>
+ * selector.connect(42, new InetSocketAddress(&quot;google.com&quot;, server.port), 64000, 64000);
+ * </pre>
+ * 
+ * The connect call does not block on the creation of the TCP connection, so the connect method only begins initiating
+ * the connection. The successful invocation of this method does not mean a valid connection has been established.
+ * 
+ * Sending requests, receiving responses, processing connection completions, and disconnections on the existing
+ * connections are all done using the <code>poll()</code> call.
+ * 
+ * <pre>
+ * List&lt;NetworkRequest&gt; requestsToSend = Arrays.asList(new NetworkRequest(0, myBytes), new NetworkRequest(1, myOtherBytes));
+ * selector.poll(TIMEOUT_MS, requestsToSend);
+ * </pre>
+ * 
+ * The selector maintains several lists that are reset by each call to <code>poll()</code> which are available via
+ * various getters. These are reset by each call to <code>poll()</code>.
+ * 
+ * This class is not thread safe!
+ */
+public class Selector implements Selectable {
+
+    private final java.nio.channels.Selector selector;
+    private final Map<Integer, SelectionKey> keys;
+    private final List<NetworkSend> completedSends;
+    private final List<NetworkReceive> completedReceives;
+    private final List<Integer> disconnected;
+    private final List<Integer> connected;
+
+    /**
+     * Create a new selector
+     */
+    public Selector() {
+        try {
+            this.selector = java.nio.channels.Selector.open();
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+        this.keys = new HashMap<Integer, SelectionKey>();
+        this.completedSends = new ArrayList<NetworkSend>();
+        this.completedReceives = new ArrayList<NetworkReceive>();
+        this.connected = new ArrayList<Integer>();
+        this.disconnected = new ArrayList<Integer>();
+    }
+
+    /**
+     * Begin connecting to the given address and add the connection to this selector associated with the given id
+     * number.
+     * <p>
+     * Note that this call only initiates the connection, which will be completed on a future {@link #poll(long, List)}
+     * call. Check {@link #connected()} to see which (if any) connections have completed after a given poll call.
+     * @param id The id for the new connection
+     * @param address The address to connect to
+     * @param sendBufferSize The send buffer for the new connection
+     * @param receiveBufferSize The receive buffer for the new connection
+     * @throws IllegalStateException if there is already a connection for that id
+     * @throws UnresolvedAddressException if DNS resolution fails on the hostname
+     */
+    @Override
+    public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
+        SocketChannel channel = SocketChannel.open();
+        channel.configureBlocking(false);
+        Socket socket = channel.socket();
+        socket.setKeepAlive(true);
+        socket.setSendBufferSize(sendBufferSize);
+        socket.setReceiveBufferSize(receiveBufferSize);
+        socket.setTcpNoDelay(true);
+        try {
+            channel.connect(address);
+        } catch (UnresolvedAddressException e) {
+            channel.close();
+            throw e;
+        }
+        SelectionKey key = channel.register(this.selector, SelectionKey.OP_CONNECT);
+        key.attach(new Transmissions(id));
+        if (this.keys.containsKey(key))
+            throw new IllegalStateException("There is already a connection for id " + id);
+        this.keys.put(id, key);
+    }
+
+    /**
+     * Disconnect any connections for the given id (if there are any). The disconnection is asynchronous and will not be
+     * processed until the next {@link #poll(long, List) poll()} call.
+     */
+    @Override
+    public void disconnect(int id) {
+        SelectionKey key = this.keys.get(id);
+        if (key != null)
+            key.cancel();
+    }
+
+    /**
+     * Interrupt the selector if it is blocked waiting to do I/O.
+     */
+    @Override
+    public void wakeup() {
+        this.selector.wakeup();
+    }
+
+    /**
+     * Close this selector and all associated connections
+     */
+    @Override
+    public void close() {
+        for (SelectionKey key : this.selector.keys()) {
+            try {
+                close(key);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+        try {
+            this.selector.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing
+     * disconnections, initiating new sends, or making progress on in-progress sends or receives.
+     * <p>
+     * The provided network sends will be started.
+     * 
+     * When this call is completed the user can check for completed sends, receives, connections or disconnects using
+     * {@link #completedSends()}, {@link #completedReceives()}, {@link #connected()}, {@link #disconnected()}. These
+     * lists will be cleared at the beginning of each {@link #poll(long, List)} call and repopulated by the call if any
+     * completed I/O.
+     * 
+     * @param timeout The amount of time to wait, in milliseconds. If negative, wait indefinitely.
+     * @param sends The list of new sends to begin
+     * 
+     * @throws IllegalStateException If a send is given for which we have no existing connection or for which there is
+     *         already an in-progress send
+     */
+    @Override
+    public void poll(long timeout, List<NetworkSend> sends) throws IOException {
+        clear();
+
+        /* register for write interest on any new sends */
+        for (NetworkSend send : sends) {
+            SelectionKey key = keyForId(send.destination());
+            Transmissions transmissions = transmissions(key);
+            if (transmissions.hasSend())
+                throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
+            transmissions.send = send;
+            try {
+                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+            } catch (CancelledKeyException e) {
+                close(key);
+            }
+        }
+
+        /* check ready keys */
+        int readyKeys = select(timeout);
+        if (readyKeys > 0) {
+            Set<SelectionKey> keys = this.selector.selectedKeys();
+            Iterator<SelectionKey> iter = keys.iterator();
+            while (iter.hasNext()) {
+                SelectionKey key = iter.next();
+                iter.remove();
+
+                Transmissions transmissions = transmissions(key);
+                SocketChannel channel = channel(key);
+                try {
+                    /*
+                     * complete any connections that have finished their handshake
+                     */
+                    if (key.isConnectable()) {
+                        channel.finishConnect();
+                        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
+                        this.connected.add(transmissions.id);
+                    }
+
+                    /* read from any connections that have readable data */
+                    if (key.isReadable()) {
+                        if (!transmissions.hasReceive())
+                            transmissions.receive = new NetworkReceive(transmissions.id);
+                        transmissions.receive.readFrom(channel);
+                        if (transmissions.receive.complete()) {
+                            transmissions.receive.payload().rewind();
+                            this.completedReceives.add(transmissions.receive);
+                            transmissions.clearReceive();
+                        }
+                    }
+
+                    /*
+                     * write to any sockets that have space in their buffer and for which we have data
+                     */
+                    if (key.isWritable()) {
+                        transmissions.send.writeTo(channel);
+                        if (transmissions.send.remaining() <= 0) {
+                            this.completedSends.add(transmissions.send);
+                            transmissions.clearSend();
+                            key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+                        }
+                    }
+
+                    /* cancel any defunct sockets */
+                    if (!key.isValid())
+                        close(key);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    close(key);
+                }
+            }
+        }
+    }
+
+    @Override
+    public List<NetworkSend> completedSends() {
+        return this.completedSends;
+    }
+
+    @Override
+    public List<NetworkReceive> completedReceives() {
+        return this.completedReceives;
+    }
+
+    @Override
+    public List<Integer> disconnected() {
+        return this.disconnected;
+    }
+
+    @Override
+    public List<Integer> connected() {
+        return this.connected;
+    }
+
+    /**
+     * Clear the results from the prior poll
+     */
+    private void clear() {
+        this.completedSends.clear();
+        this.completedReceives.clear();
+        this.connected.clear();
+        this.disconnected.clear();
+    }
+
+    /**
+     * Check for data, waiting up to the given timeout.
+     * 
+     * @param ms Length of time to wait, in milliseconds. If negative, wait indefinitely.
+     * @return The number of keys ready
+     * @throws IOException
+     */
+    private int select(long ms) throws IOException {
+        if (ms == 0L)
+            return this.selector.selectNow();
+        else if (ms < 0L)
+            return this.selector.select();
+        else
+            return this.selector.select(ms);
+    }
+
+    /**
+     * Begin closing this connection
+     */
+    private void close(SelectionKey key) throws IOException {
+        SocketChannel channel = channel(key);
+        Transmissions trans = transmissions(key);
+        if (trans != null)
+            this.disconnected.add(trans.id);
+        key.attach(null);
+        key.cancel();
+        channel.socket().close();
+        channel.close();
+    }
+
+    /**
+     * Get the selection key associated with this numeric id
+     */
+    private SelectionKey keyForId(int id) {
+        SelectionKey key = this.keys.get(id);
+        if (key == null)
+            throw new IllegalStateException("Attempt to write to socket for which there is no open connection.");
+        return key;
+    }
+
+    /**
+     * Get the transmissions for the given connection
+     */
+    private Transmissions transmissions(SelectionKey key) {
+        return (Transmissions) key.attachment();
+    }
+
+    /**
+     * Get the socket channel associated with this selection key
+     */
+    private SocketChannel channel(SelectionKey key) {
+        return (SocketChannel) key.channel();
+    }
+
+    /**
+     * The id and in-progress send and receive associated with a connection
+     */
+    private static class Transmissions {
+        public int id;
+        public NetworkSend send;
+        public NetworkReceive receive;
+
+        public Transmissions(int id) {
+            this.id = id;
+        }
+
+        public boolean hasSend() {
+            return this.send != null;
+        }
+
+        public void clearSend() {
+            this.send = null;
+        }
+
+        public boolean hasReceive() {
+            return this.receive != null;
+        }
+
+        public void clearReceive() {
+            this.receive = null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/network/Send.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/Send.java b/clients/src/main/java/kafka/common/network/Send.java
new file mode 100644
index 0000000..e7ef68a
--- /dev/null
+++ b/clients/src/main/java/kafka/common/network/Send.java
@@ -0,0 +1,41 @@
+package kafka.common.network;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+
+/**
+ * This interface models the in-progress sending of data to a destination identified by an integer id.
+ */
+public interface Send {
+
+    /**
+     * The numeric id for the destination of this send
+     */
+    public int destination();
+
+    /**
+     * The number of bytes remaining to send
+     */
+    public int remaining();
+
+    /**
+     * Is this send complete?
+     */
+    public boolean complete();
+
+    /**
+     * An optional method to turn this send into an array of ByteBuffers if possible (otherwise returns null)
+     */
+    public ByteBuffer[] reify();
+
+    /**
+     * Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send
+     * to be completely written
+     * @param channel The channel to write to
+     * @return The number of bytes written
+     * @throws IOException If the write fails
+     */
+    public long writeTo(GatheringByteChannel channel) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/kafka/common/protocol/ApiKeys.java
new file mode 100644
index 0000000..1e2f8bb
--- /dev/null
+++ b/clients/src/main/java/kafka/common/protocol/ApiKeys.java
@@ -0,0 +1,35 @@
+package kafka.common.protocol;
+
+/**
+ * Identifiers for all the Kafka APIs
+ */
+public enum ApiKeys {
+    PRODUCE(0, "produce"),
+    FETCH(1, "fetch"),
+    LIST_OFFSETS(2, "list_offsets"),
+    METADATA(3, "metadata"),
+    LEADER_AND_ISR(4, "leader_and_isr"),
+    STOP_REPLICA(5, "stop_replica"),
+    OFFSET_COMMIT(6, "offset_commit"),
+    OFFSET_FETCH(7, "offset_fetch");
+
+    public static int MAX_API_KEY = 0;
+
+    static {
+        for (ApiKeys key : ApiKeys.values()) {
+            MAX_API_KEY = Math.max(MAX_API_KEY, key.id);
+        }
+    }
+
+    /** the perminant and immutable id of an API--this can't change ever */
+    public final short id;
+
+    /** an english description of the api--this is for debugging and can change */
+    public final String name;
+
+    private ApiKeys(int id, String name) {
+        this.id = (short) id;
+        this.name = name;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/Errors.java b/clients/src/main/java/kafka/common/protocol/Errors.java
new file mode 100644
index 0000000..fb1a3e5
--- /dev/null
+++ b/clients/src/main/java/kafka/common/protocol/Errors.java
@@ -0,0 +1,97 @@
+package kafka.common.protocol;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import kafka.common.errors.ApiException;
+import kafka.common.errors.CorruptMessageException;
+import kafka.common.errors.LeaderNotAvailableException;
+import kafka.common.errors.MessageTooLargeException;
+import kafka.common.errors.NetworkException;
+import kafka.common.errors.NotLeaderForPartitionException;
+import kafka.common.errors.OffsetMetadataTooLarge;
+import kafka.common.errors.OffsetOutOfRangeException;
+import kafka.common.errors.TimeoutException;
+import kafka.common.errors.UnknownServerException;
+import kafka.common.errors.UnknownTopicOrPartitionException;
+
+/**
+ * This class contains all the client-server errors--those errors that must be sent from the server to the client. These
+ * are thus part of the protocol. The names can be changed but the error code cannot.
+ * 
+ * Do not add exceptions that occur only on the client or only on the server here.
+ */
+public enum Errors {
+    UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")),
+    NONE(0, null),
+    OFFSET_OUT_OF_RANGE(1,
+                        new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
+    CORRUPT_MESSAGE(2,
+                    new CorruptMessageException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
+    UNKNOWN_TOPIC_OR_PARTITION(3, new UnknownTopicOrPartitionException("This server does not host this topic-partition.")),
+    LEADER_NOT_AVAILABLE(5,
+                         new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
+    NOT_LEADER_FOR_PARTITION(6, new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
+    REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")),
+    MESSAGE_TOO_LARGE(10,
+                      new MessageTooLargeException("The request included a message larger than the max message size the server will accept.")),
+    OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
+    NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received."));
+
+    private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
+    private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();
+    static {
+        for (Errors error : Errors.values()) {
+            codeToError.put(error.code(), error);
+            if (error.exception != null)
+                classToError.put(error.exception.getClass(), error);
+        }
+
+    }
+
+    private final short code;
+    private final ApiException exception;
+
+    private Errors(int code, ApiException exception) {
+        this.code = (short) code;
+        this.exception = exception;
+    }
+
+    /**
+     * An instance of the exception
+     */
+    public ApiException exception() {
+        return this.exception;
+    }
+
+    /**
+     * The error code for the exception
+     */
+    public short code() {
+        return this.code;
+    }
+
+    /**
+     * Throw the exception corresponding to this error if there is one
+     */
+    public void maybeThrow() {
+        if (exception != null)
+            throw this.exception;
+    }
+
+    /**
+     * Throw the exception if there is one
+     */
+    public static Errors forCode(short code) {
+        Errors error = codeToError.get(code);
+        return error == null ? UNKNOWN : error;
+    }
+
+    /**
+     * Return the error instance associated with this exception (or UKNOWN if there is none)
+     */
+    public static Errors forException(Throwable t) {
+        Errors error = classToError.get(t.getClass());
+        return error == null ? UNKNOWN : error;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/protocol/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/kafka/common/protocol/ProtoUtils.java
new file mode 100644
index 0000000..83dad53
--- /dev/null
+++ b/clients/src/main/java/kafka/common/protocol/ProtoUtils.java
@@ -0,0 +1,95 @@
+package kafka.common.protocol;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import kafka.common.Cluster;
+import kafka.common.Node;
+import kafka.common.PartitionInfo;
+import kafka.common.protocol.types.Schema;
+import kafka.common.protocol.types.Struct;
+
+public class ProtoUtils {
+
+    private static Schema schemaFor(Schema[][] schemas, int apiKey, int version) {
+        if (apiKey < 0 || apiKey > schemas.length)
+            throw new IllegalArgumentException("Invalid api key: " + apiKey);
+        Schema[] versions = schemas[apiKey];
+        if (version < 0 || version > versions.length)
+            throw new IllegalArgumentException("Invalid version for API key " + apiKey + ": " + version);
+        return versions[version];
+    }
+
+    public static short latestVersion(int apiKey) {
+        if (apiKey < 0 || apiKey >= Protocol.CURR_VERSION.length)
+            throw new IllegalArgumentException("Invalid api key: " + apiKey);
+        return Protocol.CURR_VERSION[apiKey];
+    }
+
+    public static Schema requestSchema(int apiKey, int version) {
+        return schemaFor(Protocol.REQUESTS, apiKey, version);
+    }
+
+    public static Schema currentRequestSchema(int apiKey) {
+        return requestSchema(apiKey, latestVersion(apiKey));
+    }
+
+    public static Schema responseSchema(int apiKey, int version) {
+        return schemaFor(Protocol.RESPONSES, apiKey, version);
+    }
+
+    public static Schema currentResponseSchema(int apiKey) {
+        return schemaFor(Protocol.RESPONSES, apiKey, latestVersion(apiKey));
+    }
+
+    public static Struct parseRequest(int apiKey, int version, ByteBuffer buffer) {
+        return (Struct) requestSchema(apiKey, version).read(buffer);
+    }
+
+    public static Struct parseResponse(int apiKey, ByteBuffer buffer) {
+        return (Struct) currentResponseSchema(apiKey).read(buffer);
+    }
+
+    public static Cluster parseMetadataResponse(Struct response) {
+        List<Node> brokers = new ArrayList<Node>();
+        Object[] brokerStructs = (Object[]) response.get("brokers");
+        for (int i = 0; i < brokerStructs.length; i++) {
+            Struct broker = (Struct) brokerStructs[i];
+            int nodeId = (Integer) broker.get("node_id");
+            String host = (String) broker.get("host");
+            int port = (Integer) broker.get("port");
+            brokers.add(new Node(nodeId, host, port));
+        }
+        List<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
+        Object[] topicInfos = (Object[]) response.get("topic_metadata");
+        for (int i = 0; i < topicInfos.length; i++) {
+            Struct topicInfo = (Struct) topicInfos[i];
+            short topicError = topicInfo.getShort("topic_error_code");
+            if (topicError == Errors.NONE.code()) {
+                String topic = topicInfo.getString("topic");
+                Object[] partitionInfos = (Object[]) topicInfo.get("partition_metadata");
+                for (int j = 0; j < partitionInfos.length; j++) {
+                    Struct partitionInfo = (Struct) partitionInfos[j];
+                    short partError = partitionInfo.getShort("partition_error_code");
+                    if (partError == Errors.NONE.code()) {
+                        int partition = partitionInfo.getInt("partition_id");
+                        int leader = partitionInfo.getInt("leader");
+                        int[] replicas = intArray((Object[]) partitionInfo.get("replicas"));
+                        int[] isr = intArray((Object[]) partitionInfo.get("isr"));
+                        partitions.add(new PartitionInfo(topic, partition, leader, replicas, isr));
+                    }
+                }
+            }
+        }
+        return new Cluster(brokers, partitions);
+    }
+
+    private static int[] intArray(Object[] ints) {
+        int[] copy = new int[ints.length];
+        for (int i = 0; i < ints.length; i++)
+            copy[i] = (Integer) ints[i];
+        return copy;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/Protocol.java b/clients/src/main/java/kafka/common/protocol/Protocol.java
new file mode 100644
index 0000000..e191d6a
--- /dev/null
+++ b/clients/src/main/java/kafka/common/protocol/Protocol.java
@@ -0,0 +1,130 @@
+package kafka.common.protocol;
+
+import static kafka.common.protocol.types.Type.BYTES;
+import static kafka.common.protocol.types.Type.INT16;
+import static kafka.common.protocol.types.Type.INT32;
+import static kafka.common.protocol.types.Type.INT64;
+import static kafka.common.protocol.types.Type.STRING;
+import kafka.common.protocol.types.ArrayOf;
+import kafka.common.protocol.types.Field;
+import kafka.common.protocol.types.Schema;
+
+public class Protocol {
+
+    public static Schema REQUEST_HEADER = new Schema(new Field("api_key", INT16, "The id of the request type."),
+                                                     new Field("api_version", INT16, "The version of the API."),
+                                                     new Field("correlation_id",
+                                                               INT32,
+                                                               "A user-supplied integer value that will be passed back with the response"),
+                                                     new Field("client_id",
+                                                               STRING,
+                                                               "A user specified identifier for the client making the request."));
+
+    public static Schema RESPONSE_HEADER = new Schema(new Field("correlation_id",
+                                                                INT32,
+                                                                "The user-supplied value passed in with the request"));
+
+    /* Metadata api */
+
+    public static Schema METADATA_REQUEST_V0 = new Schema(new Field("topics",
+                                                                    new ArrayOf(STRING),
+                                                                    "An array of topics to fetch metadata for. If no topics are specified fetch metadtata for all topics."));
+
+    public static Schema BROKER = new Schema(new Field("node_id", INT32, "The broker id."),
+                                             new Field("host", STRING, "The hostname of the broker."),
+                                             new Field("port", INT32, "The port on which the broker accepts requests."));
+
+    public static Schema PARTITION_METADATA_V0 = new Schema(new Field("partition_error_code",
+                                                                      INT16,
+                                                                      "The error code for the partition, if any."),
+                                                            new Field("partition_id", INT32, "The id of the partition."),
+                                                            new Field("leader",
+                                                                      INT32,
+                                                                      "The id of the broker acting as leader for this partition."),
+                                                            new Field("replicas",
+                                                                      new ArrayOf(INT32),
+                                                                      "The set of all nodes that host this partition."),
+                                                            new Field("isr",
+                                                                      new ArrayOf(INT32),
+                                                                      "The set of nodes that are in sync with the leader for this partition."));
+
+    public static Schema TOPIC_METADATA_V0 = new Schema(new Field("topic_error_code", INT16, "The error code for the given topic."),
+                                                        new Field("topic", STRING, "The name of the topic"),
+                                                        new Field("partition_metadata",
+                                                                  new ArrayOf(PARTITION_METADATA_V0),
+                                                                  "Metadata for each partition of the topic."));
+
+    public static Schema METADATA_RESPONSE_V0 = new Schema(new Field("brokers",
+                                                                     new ArrayOf(BROKER),
+                                                                     "Host and port information for all brokers."),
+                                                           new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V0)));
+
+    public static Schema[] METADATA_REQUEST = new Schema[] { METADATA_REQUEST_V0 };
+    public static Schema[] METADATA_RESPONSE = new Schema[] { METADATA_RESPONSE_V0 };
+
+    /* Produce api */
+
+    public static Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", STRING),
+                                                            new Field("data", new ArrayOf(new Schema(new Field("partition", INT32),
+                                                                                                     new Field("message_set", BYTES)))));
+
+    public static Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks",
+                                                                   INT16,
+                                                                   "The number of nodes that should replicate the produce before returning. -1 indicates the full ISR."),
+                                                         new Field("timeout", INT32, "The time to await a response in ms."),
+                                                         new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
+
+    public static Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses",
+                                                                    new ArrayOf(new Schema(new Field("topic", STRING),
+                                                                                           new Field("partition_responses",
+                                                                                                     new ArrayOf(new Schema(new Field("partition",
+                                                                                                                                      INT32),
+                                                                                                                            new Field("error_code",
+                                                                                                                                      INT16),
+                                                                                                                            new Field("base_offset",
+                                                                                                                                      INT64))))))));
+
+    public static Schema[] PRODUCE_REQUEST = new Schema[] { PRODUCE_REQUEST_V0 };
+    public static Schema[] PRODUCE_RESPONSE = new Schema[] { PRODUCE_RESPONSE_V0 };
+
+    /* an array of all requests and responses with all schema versions */
+    public static Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
+    public static Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
+
+    /* the latest version of each api */
+    public static short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1];
+
+    static {
+        REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST;
+        REQUESTS[ApiKeys.FETCH.id] = new Schema[] {};
+        REQUESTS[ApiKeys.LIST_OFFSETS.id] = new Schema[] {};
+        REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST;
+        REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
+        REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
+        REQUESTS[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {};
+        REQUESTS[ApiKeys.OFFSET_FETCH.id] = new Schema[] {};
+
+        RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
+        RESPONSES[ApiKeys.FETCH.id] = new Schema[] {};
+        RESPONSES[ApiKeys.LIST_OFFSETS.id] = new Schema[] {};
+        RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE;
+        RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
+        RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
+        RESPONSES[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {};
+        RESPONSES[ApiKeys.OFFSET_FETCH.id] = new Schema[] {};
+
+        /* set the maximum version of each api */
+        for (ApiKeys api : ApiKeys.values())
+            CURR_VERSION[api.id] = (short) (REQUESTS[api.id].length - 1);
+
+        /* sanity check that we have the same number of request and response versions for each api */
+        for (ApiKeys api : ApiKeys.values())
+            if (REQUESTS[api.id].length != RESPONSES[api.id].length)
+                throw new IllegalStateException(REQUESTS[api.id].length + " request versions for api "
+                                                + api.name
+                                                + " but "
+                                                + RESPONSES[api.id].length
+                                                + " response versions.");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/protocol/types/ArrayOf.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/ArrayOf.java b/clients/src/main/java/kafka/common/protocol/types/ArrayOf.java
new file mode 100644
index 0000000..5daf95b
--- /dev/null
+++ b/clients/src/main/java/kafka/common/protocol/types/ArrayOf.java
@@ -0,0 +1,63 @@
+package kafka.common.protocol.types;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Represents a type for an array of a particular type
+ */
+public class ArrayOf extends Type {
+
+    private final Type type;
+
+    public ArrayOf(Type type) {
+        this.type = type;
+    }
+
+    @Override
+    public void write(ByteBuffer buffer, Object o) {
+        Object[] objs = (Object[]) o;
+        int size = objs.length;
+        buffer.putInt(size);
+        for (int i = 0; i < size; i++)
+            type.write(buffer, objs[i]);
+    }
+
+    @Override
+    public Object read(ByteBuffer buffer) {
+        int size = buffer.getInt();
+        Object[] objs = new Object[size];
+        for (int i = 0; i < size; i++)
+            objs[i] = type.read(buffer);
+        return objs;
+    }
+
+    @Override
+    public int sizeOf(Object o) {
+        Object[] objs = (Object[]) o;
+        int size = 4;
+        for (int i = 0; i < objs.length; i++)
+            size += type.sizeOf(objs[i]);
+        return size;
+    }
+
+    public Type type() {
+        return type;
+    }
+
+    @Override
+    public String toString() {
+        return "ARRAY(" + type + ")";
+    }
+
+    @Override
+    public Object[] validate(Object item) {
+        try {
+            Object[] array = (Object[]) item;
+            for (int i = 0; i < array.length; i++)
+                type.validate(array[i]);
+            return array;
+        } catch (ClassCastException e) {
+            throw new SchemaException("Not an Object[].");
+        }
+    }
+}