You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/02/07 01:26:42 UTC

[11/13] Rename client package from kafka.* to org.apache.kafka.*

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/Metrics.java b/clients/src/main/java/kafka/common/metrics/Metrics.java
deleted file mode 100644
index f2cb782..0000000
--- a/clients/src/main/java/kafka/common/metrics/Metrics.java
+++ /dev/null
@@ -1,190 +0,0 @@
-package kafka.common.metrics;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import kafka.common.utils.SystemTime;
-import kafka.common.utils.Time;
-import kafka.common.utils.Utils;
-
-/**
- * A registry of sensors and metrics.
- * <p>
- * A metric is a named, numerical measurement. A sensor is a handle to record numerical measurements as they occur. Each
- * Sensor has zero or more associated metrics. For example a Sensor might represent message sizes and we might associate
- * with this sensor a metric for the average, maximum, or other statistics computed off the sequence of message sizes
- * that are recorded by the sensor.
- * <p>
- * Usage looks something like this:
- * 
- * <pre>
- * // set up metrics:
- * Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
- * Sensor sensor = metrics.sensor(&quot;message-sizes&quot;);
- * sensor.add(&quot;kafka.producer.message-sizes.avg&quot;, new Avg());
- * sensor.add(&quot;kafka.producer.message-sizes.max&quot;, new Max());
- * 
- * // as messages are sent we record the sizes
- * sensor.record(messageSize);
- * </pre>
- */
-public class Metrics {
-
-    private final MetricConfig config;
-    private final ConcurrentMap<String, KafkaMetric> metrics;
-    private final ConcurrentMap<String, Sensor> sensors;
-    private final List<MetricsReporter> reporters;
-    private final Time time;
-
-    /**
-     * Create a metrics repository with no metric reporters and default configuration.
-     */
-    public Metrics() {
-        this(new MetricConfig());
-    }
-
-    /**
-     * Create a metrics repository with no metric reporters and default configuration.
-     */
-    public Metrics(Time time) {
-        this(new MetricConfig(), new ArrayList<MetricsReporter>(), time);
-    }
-
-    /**
-     * Create a metrics repository with no reporters and the given default config. This config will be used for any
-     * metric that doesn't override its own config.
-     * @param defaultConfig The default config to use for all metrics that don't override their config
-     */
-    public Metrics(MetricConfig defaultConfig) {
-        this(defaultConfig, new ArrayList<MetricsReporter>(0), new SystemTime());
-    }
-
-    /**
-     * Create a metrics repository with a default config and the given metric reporters
-     * @param defaultConfig The default config
-     * @param reporters The metrics reporters
-     * @param time The time instance to use with the metrics
-     */
-    public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time) {
-        this.config = defaultConfig;
-        this.sensors = new ConcurrentHashMap<String, Sensor>();
-        this.metrics = new ConcurrentHashMap<String, KafkaMetric>();
-        this.reporters = Utils.notNull(reporters);
-        this.time = time;
-        for (MetricsReporter reporter : reporters)
-            reporter.init(new ArrayList<KafkaMetric>());
-    }
-
-    /**
-     * Create a sensor with the given unique name and zero or more parent sensors. All parent sensors will receive every
-     * value recorded with this sensor.
-     * @param name The name of the sensor
-     * @param parents The parent sensors
-     * @return The sensor that is created
-     */
-    public Sensor sensor(String name, Sensor... parents) {
-        return sensor(name, null, parents);
-    }
-
-    /**
-     * Create a sensor with the given unique name and zero or more parent sensors. All parent sensors will receive every
-     * value recorded with this sensor.
-     * @param name The name of the sensor
-     * @param config A default configuration to use for this sensor for metrics that don't have their own config
-     * @param parents The parent sensors
-     * @return The sensor that is created
-     */
-    public synchronized Sensor sensor(String name, MetricConfig config, Sensor... parents) {
-        Sensor s = this.sensors.get(Utils.notNull(name));
-        if (s == null) {
-            s = new Sensor(this, name, parents, config == null ? this.config : config, time);
-            this.sensors.put(name, s);
-        }
-        return s;
-    }
-
-    /**
-     * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
-     * This is a way to expose existing values as metrics.
-     * @param name The name of the metric
-     * @param measurable The measurable that will be measured by this metric
-     */
-    public void addMetric(String name, Measurable measurable) {
-        addMetric(name, "", measurable);
-    }
-
-    /**
-     * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
-     * This is a way to expose existing values as metrics.
-     * @param name The name of the metric
-     * @param description A human-readable description to include in the metric
-     * @param measurable The measurable that will be measured by this metric
-     */
-    public void addMetric(String name, String description, Measurable measurable) {
-        addMetric(name, description, null, measurable);
-    }
-
-    /**
-     * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
-     * This is a way to expose existing values as metrics.
-     * @param name The name of the metric
-     * @param config The configuration to use when measuring this measurable
-     * @param measurable The measurable that will be measured by this metric
-     */
-    public void addMetric(String name, MetricConfig config, Measurable measurable) {
-        addMetric(name, "", config, measurable);
-    }
-
-    /**
-     * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
-     * This is a way to expose existing values as metrics.
-     * @param name The name of the metric
-     * @param description A human-readable description to include in the metric
-     * @param config The configuration to use when measuring this measurable
-     * @param measurable The measurable that will be measured by this metric
-     */
-    public synchronized void addMetric(String name, String description, MetricConfig config, Measurable measurable) {
-        KafkaMetric m = new KafkaMetric(new Object(),
-                                        Utils.notNull(name),
-                                        Utils.notNull(description),
-                                        Utils.notNull(measurable),
-                                        config == null ? this.config : config,
-                                        time);
-        registerMetric(m);
-    }
-
-    /**
-     * Add a MetricReporter
-     */
-    public synchronized void addReporter(MetricsReporter reporter) {
-        Utils.notNull(reporter).init(new ArrayList<KafkaMetric>(metrics.values()));
-        this.reporters.add(reporter);
-    }
-
-    synchronized void registerMetric(KafkaMetric metric) {
-        if (this.metrics.containsKey(metric.name()))
-            throw new IllegalArgumentException("A metric named '" + metric.name() + "' already exists, can't register another one.");
-        this.metrics.put(metric.name(), metric);
-        for (MetricsReporter reporter : reporters)
-            reporter.metricChange(metric);
-    }
-
-    /**
-     * Get all the metrics currently maintained indexed by metric name
-     */
-    public Map<String, KafkaMetric> metrics() {
-        return this.metrics;
-    }
-
-    /**
-     * Close this metrics repository.
-     */
-    public void close() {
-        for (MetricsReporter reporter : this.reporters)
-            reporter.close();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/MetricsReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/MetricsReporter.java b/clients/src/main/java/kafka/common/metrics/MetricsReporter.java
deleted file mode 100644
index bf0b39e..0000000
--- a/clients/src/main/java/kafka/common/metrics/MetricsReporter.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package kafka.common.metrics;
-
-import java.util.List;
-
-/**
- * A plugin interface to allow things to listen as new metrics are created so they can be reported
- */
-public interface MetricsReporter {
-
-    /**
-     * This is called when the reporter is first registered to initially register all existing metrics
-     * @param metrics All currently existing metrics
-     */
-    public void init(List<KafkaMetric> metrics);
-
-    /**
-     * This is called whenever a metric is updated or added
-     * @param metric
-     */
-    public void metricChange(KafkaMetric metric);
-
-    /**
-     * Called when the metrics repository is closed.
-     */
-    public void close();
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/Quota.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/Quota.java b/clients/src/main/java/kafka/common/metrics/Quota.java
deleted file mode 100644
index 6278246..0000000
--- a/clients/src/main/java/kafka/common/metrics/Quota.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package kafka.common.metrics;
-
-/**
- * An upper or lower bound for metrics
- */
-public final class Quota {
-
-    private final boolean upper;
-    private final double bound;
-
-    public Quota(double bound, boolean upper) {
-        this.bound = bound;
-        this.upper = upper;
-    }
-
-    public static Quota lessThan(double upperBound) {
-        return new Quota(upperBound, true);
-    }
-
-    public static Quota moreThan(double lowerBound) {
-        return new Quota(lowerBound, false);
-    }
-
-    public boolean isUpperBound() {
-        return this.upper;
-    }
-
-    public double bound() {
-        return this.bound;
-    }
-
-    public boolean acceptable(double value) {
-        return (upper && value <= bound) || (!upper && value >= bound);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/QuotaViolationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/QuotaViolationException.java b/clients/src/main/java/kafka/common/metrics/QuotaViolationException.java
deleted file mode 100644
index b9005cd..0000000
--- a/clients/src/main/java/kafka/common/metrics/QuotaViolationException.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package kafka.common.metrics;
-
-import kafka.common.KafkaException;
-
-/**
- * Thrown when a sensor records a value that causes a metric to go outside the bounds configured as its quota
- */
-public class QuotaViolationException extends KafkaException {
-
-    private static final long serialVersionUID = 1L;
-
-    public QuotaViolationException(String m) {
-        super(m);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/Sensor.java b/clients/src/main/java/kafka/common/metrics/Sensor.java
deleted file mode 100644
index 9c11835..0000000
--- a/clients/src/main/java/kafka/common/metrics/Sensor.java
+++ /dev/null
@@ -1,171 +0,0 @@
-package kafka.common.metrics;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import kafka.common.metrics.CompoundStat.NamedMeasurable;
-import kafka.common.utils.Time;
-import kafka.common.utils.Utils;
-
-/**
- * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on
- * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set
- * of metrics about request sizes such as the average or max.
- */
-public final class Sensor {
-
-    private final Metrics registry;
-    private final String name;
-    private final Sensor[] parents;
-    private final List<Stat> stats;
-    private final List<KafkaMetric> metrics;
-    private final MetricConfig config;
-    private final Time time;
-
-    Sensor(Metrics registry, String name, Sensor[] parents, MetricConfig config, Time time) {
-        super();
-        this.registry = registry;
-        this.name = Utils.notNull(name);
-        this.parents = parents;
-        this.metrics = new ArrayList<KafkaMetric>();
-        this.stats = new ArrayList<Stat>();
-        this.config = config;
-        this.time = time;
-        checkForest(new HashSet<Sensor>());
-    }
-
-    /* Validate that this sensor doesn't end up referencing itself */
-    private void checkForest(Set<Sensor> sensors) {
-        if (!sensors.add(this))
-            throw new IllegalArgumentException("Circular dependency in sensors: " + name() + " is its own parent.");
-        for (int i = 0; i < parents.length; i++)
-            parents[i].checkForest(sensors);
-    }
-
-    /**
-     * The name this sensor is registered with. This name will be unique among all registered sensors.
-     */
-    public String name() {
-        return this.name;
-    }
-
-    /**
-     * Record an occurrence, this is just short-hand for {@link #record(double) record(1.0)}
-     */
-    public void record() {
-        record(1.0);
-    }
-
-    /**
-     * Record a value with this sensor
-     * @param value The value to record
-     * @throws QuotaViolationException if recording this value moves a metric beyond its configured maximum or minimum
-     *         bound
-     */
-    public void record(double value) {
-        record(value, time.nanoseconds());
-    }
-
-    private void record(double value, long time) {
-        synchronized (this) {
-            // increment all the stats
-            for (int i = 0; i < this.stats.size(); i++)
-                this.stats.get(i).record(config, value, time);
-            checkQuotas(time);
-
-        }
-        for (int i = 0; i < parents.length; i++)
-            parents[i].record(value, time);
-    }
-
-    private void checkQuotas(long time) {
-        for (int i = 0; i < this.metrics.size(); i++) {
-            KafkaMetric metric = this.metrics.get(i);
-            MetricConfig config = metric.config();
-            if (config != null) {
-                Quota quota = config.quota();
-                if (quota != null)
-                    if (!quota.acceptable(metric.value(time)))
-                        throw new QuotaViolationException("Metric " + metric.name() + " is in violation of its quota of " + quota.bound());
-            }
-        }
-    }
-
-    /**
-     * Register a compound statistic with this sensor with no config override
-     */
-    public void add(CompoundStat stat) {
-        add(stat, null);
-    }
-
-    /**
-     * Register a compound statistic with this sensor which yields multiple measurable quantities (like a histogram)
-     * @param stat The stat to register
-     * @param config The configuration for this stat. If null then the stat will use the default configuration for this
-     *        sensor.
-     */
-    public synchronized void add(CompoundStat stat, MetricConfig config) {
-        this.stats.add(Utils.notNull(stat));
-        for (NamedMeasurable m : stat.stats()) {
-            KafkaMetric metric = new KafkaMetric(this, m.name(), m.description(), m.stat(), config == null ? this.config : config, time);
-            this.registry.registerMetric(metric);
-            this.metrics.add(metric);
-        }
-    }
-
-    /**
-     * Add a metric with default configuration and no description. Equivalent to
-     * {@link Sensor#add(String, String, MeasurableStat, MetricConfig) add(name, "", stat, null)}
-     * 
-     */
-    public void add(String name, MeasurableStat stat) {
-        add(name, stat, null);
-    }
-
-    /**
-     * Add a metric with default configuration. Equivalent to
-     * {@link Sensor#add(String, String, MeasurableStat, MetricConfig) add(name, description, stat, null)}
-     * 
-     */
-    public void add(String name, String description, MeasurableStat stat) {
-        add(name, description, stat, null);
-    }
-
-    /**
-     * Add a metric to this sensor with no description. Equivalent to
-     * {@link Sensor#add(String, String, MeasurableStat, MetricConfig) add(name, "", stat, config)}
-     * @param name
-     * @param stat
-     * @param config
-     */
-    public void add(String name, MeasurableStat stat, MetricConfig config) {
-        add(name, "", stat, config);
-    }
-
-    /**
-     * Register a metric with this sensor
-     * @param name The name of the metric
-     * @param description A description used when reporting the value
-     * @param stat The statistic to keep
-     * @param config A special configuration for this metric. If null use the sensor default configuration.
-     */
-    public synchronized void add(String name, String description, MeasurableStat stat, MetricConfig config) {
-        KafkaMetric metric = new KafkaMetric(this,
-                                             Utils.notNull(name),
-                                             Utils.notNull(description),
-                                             Utils.notNull(stat),
-                                             config == null ? this.config : config,
-                                             time);
-        this.registry.registerMetric(metric);
-        this.metrics.add(metric);
-        this.stats.add(stat);
-    }
-
-    synchronized List<KafkaMetric> metrics() {
-        return Collections.unmodifiableList(this.metrics);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/Stat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/Stat.java b/clients/src/main/java/kafka/common/metrics/Stat.java
deleted file mode 100644
index 8844545..0000000
--- a/clients/src/main/java/kafka/common/metrics/Stat.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package kafka.common.metrics;
-
-/**
- * A Stat is a quanity such as average, max, etc that is computed off the stream of updates to a sensor
- */
-public interface Stat {
-
-    /**
-     * Record the given value
-     * @param config The configuration to use for this metric
-     * @param value The value to record
-     * @param time The time this value occurred
-     */
-    public void record(MetricConfig config, double value, long time);
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/stats/Avg.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Avg.java b/clients/src/main/java/kafka/common/metrics/stats/Avg.java
deleted file mode 100644
index b9d3d5d..0000000
--- a/clients/src/main/java/kafka/common/metrics/stats/Avg.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package kafka.common.metrics.stats;
-
-import java.util.List;
-
-import kafka.common.metrics.MetricConfig;
-
-/**
- * A {@link SampledStat} that maintains a simple average over its samples.
- */
-public class Avg extends SampledStat {
-
-    public Avg() {
-        super(0.0);
-    }
-
-    @Override
-    protected void update(Sample sample, MetricConfig config, double value, long now) {
-        sample.value += value;
-    }
-
-    @Override
-    public double combine(List<Sample> samples, MetricConfig config, long now) {
-        double total = 0.0;
-        long count = 0;
-        for (int i = 0; i < samples.size(); i++) {
-            Sample s = samples.get(i);
-            total += s.value;
-            count += s.eventCount;
-        }
-        return total / count;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/stats/Count.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Count.java b/clients/src/main/java/kafka/common/metrics/stats/Count.java
deleted file mode 100644
index 3712e78..0000000
--- a/clients/src/main/java/kafka/common/metrics/stats/Count.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package kafka.common.metrics.stats;
-
-import java.util.List;
-
-import kafka.common.metrics.MetricConfig;
-
-/**
- * A {@link SampledStat} that maintains a simple count of what it has seen.
- */
-public class Count extends SampledStat {
-
-    public Count() {
-        super(0);
-    }
-
-    @Override
-    protected void update(Sample sample, MetricConfig config, double value, long now) {
-        sample.value += 1.0;
-    }
-
-    @Override
-    public double combine(List<Sample> samples, MetricConfig config, long now) {
-        double total = 0.0;
-        for (int i = 0; i < samples.size(); i++)
-            total += samples.get(i).value;
-        return total;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/stats/Histogram.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Histogram.java b/clients/src/main/java/kafka/common/metrics/stats/Histogram.java
deleted file mode 100644
index 9922571..0000000
--- a/clients/src/main/java/kafka/common/metrics/stats/Histogram.java
+++ /dev/null
@@ -1,141 +0,0 @@
-package kafka.common.metrics.stats;
-
-public class Histogram {
-
-    private final BinScheme binScheme;
-    private final float[] hist;
-    private double count;
-
-    public Histogram(BinScheme binScheme) {
-        this.hist = new float[binScheme.bins()];
-        this.count = 0.0f;
-        this.binScheme = binScheme;
-    }
-
-    public void record(double value) {
-        this.hist[binScheme.toBin(value)] += 1.0f;
-        this.count += 1.0f;
-    }
-
-    public double value(double quantile) {
-        if (count == 0.0d)
-            return Double.NaN;
-        float sum = 0.0f;
-        float quant = (float) quantile;
-        for (int i = 0; i < this.hist.length - 1; i++) {
-            sum += this.hist[i];
-            if (sum / count > quant)
-                return binScheme.fromBin(i);
-        }
-        return Float.POSITIVE_INFINITY;
-    }
-
-    public float[] counts() {
-        return this.hist;
-    }
-
-    public void clear() {
-        for (int i = 0; i < this.hist.length; i++)
-            this.hist[i] = 0.0f;
-        this.count = 0;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder b = new StringBuilder('{');
-        for (int i = 0; i < this.hist.length - 1; i++) {
-            b.append(String.format("%.10f", binScheme.fromBin(i)));
-            b.append(':');
-            b.append(String.format("%.0f", this.hist[i]));
-            b.append(',');
-        }
-        b.append(Float.POSITIVE_INFINITY);
-        b.append(':');
-        b.append(this.hist[this.hist.length - 1]);
-        b.append('}');
-        return b.toString();
-    }
-
-    public interface BinScheme {
-        public int bins();
-
-        public int toBin(double value);
-
-        public double fromBin(int bin);
-    }
-
-    public static class ConstantBinScheme implements BinScheme {
-        private final double min;
-        private final double max;
-        private final int bins;
-        private final double bucketWidth;
-
-        public ConstantBinScheme(int bins, double min, double max) {
-            if (bins < 2)
-                throw new IllegalArgumentException("Must have at least 2 bins.");
-            this.min = min;
-            this.max = max;
-            this.bins = bins;
-            this.bucketWidth = (max - min) / (bins - 2);
-        }
-
-        public int bins() {
-            return this.bins;
-        }
-
-        public double fromBin(int b) {
-            if (b == 0)
-                return Double.NEGATIVE_INFINITY;
-            else if (b == bins - 1)
-                return Double.POSITIVE_INFINITY;
-            else
-                return min + (b - 1) * bucketWidth;
-        }
-
-        public int toBin(double x) {
-            if (x < min)
-                return 0;
-            else if (x > max)
-                return bins - 1;
-            else
-                return (int) ((x - min) / bucketWidth) + 1;
-        }
-    }
-
-    public static class LinearBinScheme implements BinScheme {
-        private final int bins;
-        private final double max;
-        private final double scale;
-
-        public LinearBinScheme(int numBins, double max) {
-            this.bins = numBins;
-            this.max = max;
-            this.scale = max / (numBins * (numBins - 1) / 2);
-        }
-
-        public int bins() {
-            return this.bins;
-        }
-
-        public double fromBin(int b) {
-            if (b == this.bins - 1) {
-                return Float.POSITIVE_INFINITY;
-            } else {
-                double unscaled = (b * (b + 1.0)) / 2.0;
-                return unscaled * this.scale;
-            }
-        }
-
-        public int toBin(double x) {
-            if (x < 0.0d) {
-                throw new IllegalArgumentException("Values less than 0.0 not accepted.");
-            } else if (x > this.max) {
-                return this.bins - 1;
-            } else {
-                double scaled = x / this.scale;
-                return (int) (-0.5 + Math.sqrt(2.0 * scaled + 0.25));
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/stats/Max.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Max.java b/clients/src/main/java/kafka/common/metrics/stats/Max.java
deleted file mode 100644
index e7bd1d2..0000000
--- a/clients/src/main/java/kafka/common/metrics/stats/Max.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package kafka.common.metrics.stats;
-
-import java.util.List;
-
-import kafka.common.metrics.MetricConfig;
-
-/**
- * A {@link SampledStat} that gives the max over its samples.
- */
-public final class Max extends SampledStat {
-
-    public Max() {
-        super(Double.NEGATIVE_INFINITY);
-    }
-
-    @Override
-    protected void update(Sample sample, MetricConfig config, double value, long now) {
-        sample.value = Math.max(sample.value, value);
-    }
-
-    @Override
-    public double combine(List<Sample> samples, MetricConfig config, long now) {
-        double max = Double.NEGATIVE_INFINITY;
-        for (int i = 0; i < samples.size(); i++)
-            max = Math.max(max, samples.get(i).value);
-        return max;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/stats/Min.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Min.java b/clients/src/main/java/kafka/common/metrics/stats/Min.java
deleted file mode 100644
index db0ab92..0000000
--- a/clients/src/main/java/kafka/common/metrics/stats/Min.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package kafka.common.metrics.stats;
-
-import java.util.List;
-
-import kafka.common.metrics.MetricConfig;
-
-/**
- * A {@link SampledStat} that gives the min over its samples.
- */
-public class Min extends SampledStat {
-
-    public Min() {
-        super(Double.MIN_VALUE);
-    }
-
-    @Override
-    protected void update(Sample sample, MetricConfig config, double value, long now) {
-        sample.value = Math.min(sample.value, value);
-    }
-
-    @Override
-    public double combine(List<Sample> samples, MetricConfig config, long now) {
-        double max = Double.MAX_VALUE;
-        for (int i = 0; i < samples.size(); i++)
-            max = Math.min(max, samples.get(i).value);
-        return max;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/stats/Percentile.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Percentile.java b/clients/src/main/java/kafka/common/metrics/stats/Percentile.java
deleted file mode 100644
index 84320bb..0000000
--- a/clients/src/main/java/kafka/common/metrics/stats/Percentile.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package kafka.common.metrics.stats;
-
-public class Percentile {
-
-    private final String name;
-    private final String description;
-    private final double percentile;
-
-    public Percentile(String name, double percentile) {
-        this(name, "", percentile);
-    }
-
-    public Percentile(String name, String description, double percentile) {
-        super();
-        this.name = name;
-        this.description = description;
-        this.percentile = percentile;
-    }
-
-    public String name() {
-        return this.name;
-    }
-
-    public String description() {
-        return this.description;
-    }
-
-    public double percentile() {
-        return this.percentile;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java b/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java
deleted file mode 100644
index c3f8942..0000000
--- a/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java
+++ /dev/null
@@ -1,104 +0,0 @@
-package kafka.common.metrics.stats;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import kafka.common.metrics.CompoundStat;
-import kafka.common.metrics.Measurable;
-import kafka.common.metrics.MetricConfig;
-import kafka.common.metrics.stats.Histogram.BinScheme;
-import kafka.common.metrics.stats.Histogram.ConstantBinScheme;
-import kafka.common.metrics.stats.Histogram.LinearBinScheme;
-
-/**
- * A compound stat that reports one or more percentiles
- */
-public class Percentiles extends SampledStat implements CompoundStat {
-
-    public static enum BucketSizing {
-        CONSTANT, LINEAR
-    }
-
-    private final int buckets;
-    private final Percentile[] percentiles;
-    private final BinScheme binScheme;
-
-    public Percentiles(int sizeInBytes, double max, BucketSizing bucketing, Percentile... percentiles) {
-        this(sizeInBytes, 0.0, max, bucketing, percentiles);
-    }
-
-    public Percentiles(int sizeInBytes, double min, double max, BucketSizing bucketing, Percentile... percentiles) {
-        super(0.0);
-        this.percentiles = percentiles;
-        this.buckets = sizeInBytes / 4;
-        if (bucketing == BucketSizing.CONSTANT) {
-            this.binScheme = new ConstantBinScheme(buckets, min, max);
-        } else if (bucketing == BucketSizing.LINEAR) {
-            if (min != 0.0d)
-                throw new IllegalArgumentException("Linear bucket sizing requires min to be 0.0.");
-            this.binScheme = new LinearBinScheme(buckets, max);
-        } else {
-            throw new IllegalArgumentException("Unknown bucket type: " + bucketing);
-        }
-    }
-
-    @Override
-    public List<NamedMeasurable> stats() {
-        List<NamedMeasurable> ms = new ArrayList<NamedMeasurable>(this.percentiles.length);
-        for (Percentile percentile : this.percentiles) {
-            final double pct = percentile.percentile();
-            ms.add(new NamedMeasurable(percentile.name(), percentile.description(), new Measurable() {
-                public double measure(MetricConfig config, long now) {
-                    return value(config, now, pct / 100.0);
-                }
-            }));
-        }
-        return ms;
-    }
-
-    public double value(MetricConfig config, long now, double quantile) {
-        timeoutObsoleteSamples(config, now);
-        float count = 0.0f;
-        for (Sample sample : this.samples)
-            count += sample.eventCount;
-        if (count == 0.0f)
-            return Double.NaN;
-        float sum = 0.0f;
-        float quant = (float) quantile;
-        for (int b = 0; b < buckets; b++) {
-            for (int s = 0; s < this.samples.size(); s++) {
-                HistogramSample sample = (HistogramSample) this.samples.get(s);
-                float[] hist = sample.histogram.counts();
-                sum += hist[b];
-                if (sum / count > quant)
-                    return binScheme.fromBin(b);
-            }
-        }
-        return Double.POSITIVE_INFINITY;
-    }
-
-    public double combine(List<Sample> samples, MetricConfig config, long now) {
-        return value(config, now, 0.5);
-    }
-
-    @Override
-    protected HistogramSample newSample(long now) {
-        return new HistogramSample(this.binScheme, now);
-    }
-
-    @Override
-    protected void update(Sample sample, MetricConfig config, double value, long now) {
-        HistogramSample hist = (HistogramSample) sample;
-        hist.histogram.record(value);
-    }
-
-    private static class HistogramSample extends SampledStat.Sample {
-        private final Histogram histogram;
-
-        private HistogramSample(BinScheme scheme, long now) {
-            super(0.0, now);
-            this.histogram = new Histogram(scheme);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/stats/Rate.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/kafka/common/metrics/stats/Rate.java
deleted file mode 100644
index 3f24a92..0000000
--- a/clients/src/main/java/kafka/common/metrics/stats/Rate.java
+++ /dev/null
@@ -1,85 +0,0 @@
-package kafka.common.metrics.stats;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import kafka.common.metrics.MeasurableStat;
-import kafka.common.metrics.MetricConfig;
-
-/**
- * The rate of the given quanitity. By default this is the total observed over a set of samples from a sampled statistic
- * divided by the ellapsed time over the sample windows. Alternative {@link SampledStat} implementations can be
- * provided, however, to record the rate of occurences (e.g. the count of values measured over the time interval) or
- * other such values.
- */
-public class Rate implements MeasurableStat {
-
-    private final TimeUnit unit;
-    private final SampledStat stat;
-
-    public Rate(TimeUnit unit) {
-        this(unit, new SampledTotal());
-    }
-
-    public Rate(TimeUnit unit, SampledStat stat) {
-        this.stat = stat;
-        this.unit = unit;
-    }
-
-    public String unitName() {
-        return unit.name().substring(0, unit.name().length() - 2).toLowerCase();
-    }
-
-    @Override
-    public void record(MetricConfig config, double value, long time) {
-        this.stat.record(config, value, time);
-    }
-
-    @Override
-    public double measure(MetricConfig config, long now) {
-        double ellapsed = convert(now - stat.oldest().lastWindow);
-        return stat.measure(config, now) / ellapsed;
-    }
-
-    private double convert(long time) {
-        switch (unit) {
-            case NANOSECONDS:
-                return time;
-            case MICROSECONDS:
-                return time / 1000.0;
-            case MILLISECONDS:
-                return time / (1000.0 * 1000.0);
-            case SECONDS:
-                return time / (1000.0 * 1000.0 * 1000.0);
-            case MINUTES:
-                return time / (60.0 * 1000.0 * 1000.0 * 1000.0);
-            case HOURS:
-                return time / (60.0 * 60.0 * 1000.0 * 1000.0 * 1000.0);
-            case DAYS:
-                return time / (24.0 * 60.0 * 60.0 * 1000.0 * 1000.0 * 1000.0);
-            default:
-                throw new IllegalStateException("Unknown unit: " + unit);
-        }
-    }
-
-    public static class SampledTotal extends SampledStat {
-
-        public SampledTotal() {
-            super(0.0d);
-        }
-
-        @Override
-        protected void update(Sample sample, MetricConfig config, double value, long now) {
-            sample.value += value;
-        }
-
-        @Override
-        public double combine(List<Sample> samples, MetricConfig config, long now) {
-            double total = 0.0;
-            for (int i = 0; i < samples.size(); i++)
-                total += samples.get(i).value;
-            return total;
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java b/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java
deleted file mode 100644
index e696af5..0000000
--- a/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java
+++ /dev/null
@@ -1,110 +0,0 @@
-package kafka.common.metrics.stats;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import kafka.common.metrics.MeasurableStat;
-import kafka.common.metrics.MetricConfig;
-
-/**
- * A SampledStat records a single scalar value measured over one or more samples. Each sample is recorded over a
- * configurable window. The window can be defined by number of events or ellapsed time (or both, if both are given the
- * window is complete when <i>either</i> the event count or ellapsed time criterion is met).
- * <p>
- * All the samples are combined to produce the measurement. When a window is complete the oldest sample is cleared and
- * recycled to begin recording the next sample.
- * 
- * Subclasses of this class define different statistics measured using this basic pattern.
- */
-public abstract class SampledStat implements MeasurableStat {
-
-    private double initialValue;
-    private int current = 0;
-    protected List<Sample> samples;
-
-    public SampledStat(double initialValue) {
-        this.initialValue = initialValue;
-        this.samples = new ArrayList<Sample>(2);
-    }
-
-    @Override
-    public void record(MetricConfig config, double value, long now) {
-        Sample sample = current(now);
-        if (sample.isComplete(now, config))
-            sample = advance(config, now);
-        update(sample, config, value, now);
-        sample.eventCount += 1;
-    }
-
-    private Sample advance(MetricConfig config, long now) {
-        this.current = (this.current + 1) % config.samples();
-        if (this.current >= samples.size()) {
-            Sample sample = newSample(now);
-            this.samples.add(sample);
-            return sample;
-        } else {
-            Sample sample = current(now);
-            sample.reset(now);
-            return sample;
-        }
-    }
-
-    protected Sample newSample(long now) {
-        return new Sample(this.initialValue, now);
-    }
-
-    @Override
-    public double measure(MetricConfig config, long now) {
-        timeoutObsoleteSamples(config, now);
-        return combine(this.samples, config, now);
-    }
-
-    public Sample current(long now) {
-        if (samples.size() == 0)
-            this.samples.add(newSample(now));
-        return this.samples.get(this.current);
-    }
-
-    public Sample oldest() {
-        return this.samples.get((this.current + 1) % this.samples.size());
-    }
-
-    protected abstract void update(Sample sample, MetricConfig config, double value, long now);
-
-    public abstract double combine(List<Sample> samples, MetricConfig config, long now);
-
-    /* Timeout any windows that have expired in the absense of any events */
-    protected void timeoutObsoleteSamples(MetricConfig config, long now) {
-        for (int i = 0; i < samples.size(); i++) {
-            int idx = (this.current + i) % samples.size();
-            Sample sample = this.samples.get(idx);
-            if (now - sample.lastWindow >= (i + 1) * config.timeWindowNs())
-                sample.reset(now);
-        }
-    }
-
-    protected static class Sample {
-        public double initialValue;
-        public long eventCount;
-        public long lastWindow;
-        public double value;
-
-        public Sample(double initialValue, long now) {
-            this.initialValue = initialValue;
-            this.eventCount = 0;
-            this.lastWindow = now;
-            this.value = initialValue;
-        }
-
-        public void reset(long now) {
-            this.eventCount = 0;
-            this.lastWindow = now;
-            this.value = initialValue;
-        }
-
-        public boolean isComplete(long now, MetricConfig config) {
-            return now - lastWindow >= config.timeWindowNs() || eventCount >= config.eventWindow();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/stats/Total.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Total.java b/clients/src/main/java/kafka/common/metrics/stats/Total.java
deleted file mode 100644
index c87b1ba..0000000
--- a/clients/src/main/java/kafka/common/metrics/stats/Total.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package kafka.common.metrics.stats;
-
-import kafka.common.metrics.MeasurableStat;
-import kafka.common.metrics.MetricConfig;
-
-/**
- * An un-windowed cumulative total maintained over all time.
- */
-public class Total implements MeasurableStat {
-
-    private double total;
-
-    public Total() {
-        this.total = 0.0;
-    }
-
-    public Total(double value) {
-        this.total = value;
-    }
-
-    @Override
-    public void record(MetricConfig config, double value, long time) {
-        this.total += value;
-    }
-
-    @Override
-    public double measure(MetricConfig config, long now) {
-        return this.total;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/network/ByteBufferReceive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/ByteBufferReceive.java b/clients/src/main/java/kafka/common/network/ByteBufferReceive.java
deleted file mode 100644
index 65a7c64..0000000
--- a/clients/src/main/java/kafka/common/network/ByteBufferReceive.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package kafka.common.network;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ScatteringByteChannel;
-
-/**
- * A receive backed by an array of ByteBuffers
- */
-public class ByteBufferReceive implements Receive {
-
-    private final int source;
-    private final ByteBuffer[] buffers;
-    private int remaining;
-
-    public ByteBufferReceive(int source, ByteBuffer... buffers) {
-        super();
-        this.source = source;
-        this.buffers = buffers;
-        for (int i = 0; i < buffers.length; i++)
-            remaining += buffers[i].remaining();
-    }
-
-    @Override
-    public int source() {
-        return source;
-    }
-
-    @Override
-    public boolean complete() {
-        return remaining > 0;
-    }
-
-    @Override
-    public long readFrom(ScatteringByteChannel channel) throws IOException {
-        long read = channel.read(buffers);
-        remaining += read;
-        return read;
-    }
-
-    public ByteBuffer[] reify() {
-        return buffers;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/network/ByteBufferSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/ByteBufferSend.java b/clients/src/main/java/kafka/common/network/ByteBufferSend.java
deleted file mode 100644
index 43bf963..0000000
--- a/clients/src/main/java/kafka/common/network/ByteBufferSend.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package kafka.common.network;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.GatheringByteChannel;
-
-/**
- * A send backed by an array of byte buffers
- */
-public class ByteBufferSend implements Send {
-
-    private final int destination;
-    protected final ByteBuffer[] buffers;
-    private int remaining;
-
-    public ByteBufferSend(int destination, ByteBuffer... buffers) {
-        super();
-        this.destination = destination;
-        this.buffers = buffers;
-        for (int i = 0; i < buffers.length; i++)
-            remaining += buffers[i].remaining();
-    }
-
-    @Override
-    public int destination() {
-        return destination;
-    }
-
-    @Override
-    public boolean complete() {
-        return remaining > 0;
-    }
-
-    @Override
-    public ByteBuffer[] reify() {
-        return this.buffers;
-    }
-
-    @Override
-    public int remaining() {
-        return this.remaining;
-    }
-
-    @Override
-    public long writeTo(GatheringByteChannel channel) throws IOException {
-        long written = channel.write(buffers);
-        if (written < 0)
-            throw new EOFException("This shouldn't happen.");
-        remaining -= written;
-        return written;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/network/NetworkReceive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/NetworkReceive.java b/clients/src/main/java/kafka/common/network/NetworkReceive.java
deleted file mode 100644
index 68ae48e..0000000
--- a/clients/src/main/java/kafka/common/network/NetworkReceive.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package kafka.common.network;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ScatteringByteChannel;
-
-/**
- * A size delimited Receive that consists of a 4 byte network-ordered size N followed by N bytes of content
- */
-public class NetworkReceive implements Receive {
-
-    private final int source;
-    private final ByteBuffer size;
-    private ByteBuffer buffer;
-
-    public NetworkReceive(int source, ByteBuffer buffer) {
-        this.source = source;
-        this.buffer = buffer;
-        this.size = null;
-    }
-
-    public NetworkReceive(int source) {
-        this.source = source;
-        this.size = ByteBuffer.allocate(4);
-        this.buffer = null;
-    }
-
-    @Override
-    public int source() {
-        return source;
-    }
-
-    @Override
-    public boolean complete() {
-        return !size.hasRemaining() && !buffer.hasRemaining();
-    }
-
-    @Override
-    public ByteBuffer[] reify() {
-        return new ByteBuffer[] { this.buffer };
-    }
-
-    @Override
-    public long readFrom(ScatteringByteChannel channel) throws IOException {
-        int read = 0;
-        if (size.hasRemaining()) {
-            int bytesRead = channel.read(size);
-            if (bytesRead < 0)
-                throw new EOFException();
-            read += bytesRead;
-            if (!size.hasRemaining()) {
-                size.rewind();
-                int requestSize = size.getInt();
-                if (requestSize < 0)
-                    throw new IllegalStateException("Invalid request (size = " + requestSize + ")");
-                this.buffer = ByteBuffer.allocate(requestSize);
-            }
-        }
-        if (buffer != null) {
-            int bytesRead = channel.read(buffer);
-            if (bytesRead < 0)
-                throw new EOFException();
-            read += bytesRead;
-        }
-
-        return read;
-    }
-
-    public ByteBuffer payload() {
-        return this.buffer;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/network/NetworkSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/NetworkSend.java b/clients/src/main/java/kafka/common/network/NetworkSend.java
deleted file mode 100644
index 4e4ac98..0000000
--- a/clients/src/main/java/kafka/common/network/NetworkSend.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package kafka.common.network;
-
-import java.nio.ByteBuffer;
-
-/**
- * A size delimited Send that consists of a 4 byte network-ordered size N followed by N bytes of content
- */
-public class NetworkSend extends ByteBufferSend {
-
-    public NetworkSend(int destination, ByteBuffer... buffers) {
-        super(destination, sizeDelimit(buffers));
-    }
-
-    private static ByteBuffer[] sizeDelimit(ByteBuffer[] buffers) {
-        int size = 0;
-        for (int i = 0; i < buffers.length; i++)
-            size += buffers[i].remaining();
-        ByteBuffer[] delimited = new ByteBuffer[buffers.length + 1];
-        delimited[0] = ByteBuffer.allocate(4);
-        delimited[0].putInt(size);
-        delimited[0].rewind();
-        System.arraycopy(buffers, 0, delimited, 1, buffers.length);
-        return delimited;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/network/Receive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/Receive.java b/clients/src/main/java/kafka/common/network/Receive.java
deleted file mode 100644
index 40ee942..0000000
--- a/clients/src/main/java/kafka/common/network/Receive.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package kafka.common.network;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ScatteringByteChannel;
-
-/**
- * This interface models the in-progress reading of data from a channel to a source identified by an integer id
- */
-public interface Receive {
-
-    /**
-     * The numeric id of the source from which we are receiving data.
-     */
-    public int source();
-
-    /**
-     * Are we done receiving data?
-     */
-    public boolean complete();
-
-    /**
-     * Turn this receive into ByteBuffer instances, if possible (otherwise returns null).
-     */
-    public ByteBuffer[] reify();
-
-    /**
-     * Read bytes into this receive from the given channel
-     * @param channel The channel to read from
-     * @return The number of bytes read
-     * @throws IOException If the reading fails
-     */
-    public long readFrom(ScatteringByteChannel channel) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/network/Selectable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/Selectable.java b/clients/src/main/java/kafka/common/network/Selectable.java
deleted file mode 100644
index 794fc60..0000000
--- a/clients/src/main/java/kafka/common/network/Selectable.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package kafka.common.network;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-
-/**
- * An interface for asynchronous, multi-channel network I/O
- */
-public interface Selectable {
-
-    /**
-     * Begin establishing a socket connection to the given address identified by the given address
-     * @param id The id for this connection
-     * @param address The address to connect to
-     * @param sendBufferSize The send buffer for the socket
-     * @param receiveBufferSize The receive buffer for the socket
-     * @throws IOException If we cannot begin connecting
-     */
-    public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException;
-
-    /**
-     * Begin disconnecting the connection identified by the given id
-     */
-    public void disconnect(int id);
-
-    /**
-     * Wakeup this selector if it is blocked on I/O
-     */
-    public void wakeup();
-
-    /**
-     * Close this selector
-     */
-    public void close();
-
-    /**
-     * Initiate any sends provided, and make progress on any other I/O operations in-flight (connections,
-     * disconnections, existing sends, and receives)
-     * @param timeout The amount of time to block if there is nothing to do
-     * @param sends The new sends to initiate
-     * @throws IOException
-     */
-    public void poll(long timeout, List<NetworkSend> sends) throws IOException;
-
-    /**
-     * The list of sends that completed on the last {@link #poll(long, List<NetworkSend>) poll()} call.
-     */
-    public List<NetworkSend> completedSends();
-
-    /**
-     * The list of receives that completed on the last {@link #poll(long, List<NetworkSend>) poll()} call.
-     */
-    public List<NetworkReceive> completedReceives();
-
-    /**
-     * The list of connections that finished disconnecting on the last {@link #poll(long, List<NetworkSend>) poll()}
-     * call.
-     */
-    public List<Integer> disconnected();
-
-    /**
-     * The list of connections that completed their connection on the last {@link #poll(long, List<NetworkSend>) poll()}
-     * call.
-     */
-    public List<Integer> connected();
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/Selector.java b/clients/src/main/java/kafka/common/network/Selector.java
deleted file mode 100644
index f53060c..0000000
--- a/clients/src/main/java/kafka/common/network/Selector.java
+++ /dev/null
@@ -1,349 +0,0 @@
-package kafka.common.network;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.channels.CancelledKeyException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import kafka.common.KafkaException;
-
-/**
- * A selector interface for doing non-blocking multi-connection network I/O.
- * <p>
- * This class works with {@link NetworkSend} and {@link NetworkReceive} to transmit size-delimited network requests and
- * responses.
- * <p>
- * A connection can be added to the selector associated with an integer id by doing
- * 
- * <pre>
- * selector.connect(42, new InetSocketAddress(&quot;google.com&quot;, server.port), 64000, 64000);
- * </pre>
- * 
- * The connect call does not block on the creation of the TCP connection, so the connect method only begins initiating
- * the connection. The successful invocation of this method does not mean a valid connection has been established.
- * 
- * Sending requests, receiving responses, processing connection completions, and disconnections on the existing
- * connections are all done using the <code>poll()</code> call.
- * 
- * <pre>
- * List&lt;NetworkRequest&gt; requestsToSend = Arrays.asList(new NetworkRequest(0, myBytes), new NetworkRequest(1, myOtherBytes));
- * selector.poll(TIMEOUT_MS, requestsToSend);
- * </pre>
- * 
- * The selector maintains several lists that are reset by each call to <code>poll()</code> which are available via
- * various getters. These are reset by each call to <code>poll()</code>.
- * 
- * This class is not thread safe!
- */
-public class Selector implements Selectable {
-
-    private final java.nio.channels.Selector selector;
-    private final Map<Integer, SelectionKey> keys;
-    private final List<NetworkSend> completedSends;
-    private final List<NetworkReceive> completedReceives;
-    private final List<Integer> disconnected;
-    private final List<Integer> connected;
-
-    /**
-     * Create a new selector
-     */
-    public Selector() {
-        try {
-            this.selector = java.nio.channels.Selector.open();
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
-        this.keys = new HashMap<Integer, SelectionKey>();
-        this.completedSends = new ArrayList<NetworkSend>();
-        this.completedReceives = new ArrayList<NetworkReceive>();
-        this.connected = new ArrayList<Integer>();
-        this.disconnected = new ArrayList<Integer>();
-    }
-
-    /**
-     * Begin connecting to the given address and add the connection to this selector associated with the given id
-     * number.
-     * <p>
-     * Note that this call only initiates the connection, which will be completed on a future {@link #poll(long, List)}
-     * call. Check {@link #connected()} to see which (if any) connections have completed after a given poll call.
-     * @param id The id for the new connection
-     * @param address The address to connect to
-     * @param sendBufferSize The send buffer for the new connection
-     * @param receiveBufferSize The receive buffer for the new connection
-     * @throws IllegalStateException if there is already a connection for that id
-     * @throws UnresolvedAddressException if DNS resolution fails on the hostname
-     */
-    @Override
-    public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
-        SocketChannel channel = SocketChannel.open();
-        channel.configureBlocking(false);
-        Socket socket = channel.socket();
-        socket.setKeepAlive(true);
-        socket.setSendBufferSize(sendBufferSize);
-        socket.setReceiveBufferSize(receiveBufferSize);
-        socket.setTcpNoDelay(true);
-        try {
-            channel.connect(address);
-        } catch (UnresolvedAddressException e) {
-            channel.close();
-            throw e;
-        }
-        SelectionKey key = channel.register(this.selector, SelectionKey.OP_CONNECT);
-        key.attach(new Transmissions(id));
-        if (this.keys.containsKey(key))
-            throw new IllegalStateException("There is already a connection for id " + id);
-        this.keys.put(id, key);
-    }
-
-    /**
-     * Disconnect any connections for the given id (if there are any). The disconnection is asynchronous and will not be
-     * processed until the next {@link #poll(long, List) poll()} call.
-     */
-    @Override
-    public void disconnect(int id) {
-        SelectionKey key = this.keys.get(id);
-        if (key != null)
-            key.cancel();
-    }
-
-    /**
-     * Interrupt the selector if it is blocked waiting to do I/O.
-     */
-    @Override
-    public void wakeup() {
-        this.selector.wakeup();
-    }
-
-    /**
-     * Close this selector and all associated connections
-     */
-    @Override
-    public void close() {
-        for (SelectionKey key : this.selector.keys()) {
-            try {
-                close(key);
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-        try {
-            this.selector.close();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-    /**
-     * Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing
-     * disconnections, initiating new sends, or making progress on in-progress sends or receives.
-     * <p>
-     * The provided network sends will be started.
-     * 
-     * When this call is completed the user can check for completed sends, receives, connections or disconnects using
-     * {@link #completedSends()}, {@link #completedReceives()}, {@link #connected()}, {@link #disconnected()}. These
-     * lists will be cleared at the beginning of each {@link #poll(long, List)} call and repopulated by the call if any
-     * completed I/O.
-     * 
-     * @param timeout The amount of time to wait, in milliseconds. If negative, wait indefinitely.
-     * @param sends The list of new sends to begin
-     * 
-     * @throws IllegalStateException If a send is given for which we have no existing connection or for which there is
-     *         already an in-progress send
-     */
-    @Override
-    public void poll(long timeout, List<NetworkSend> sends) throws IOException {
-        clear();
-
-        /* register for write interest on any new sends */
-        for (NetworkSend send : sends) {
-            SelectionKey key = keyForId(send.destination());
-            Transmissions transmissions = transmissions(key);
-            if (transmissions.hasSend())
-                throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
-            transmissions.send = send;
-            try {
-                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-            } catch (CancelledKeyException e) {
-                close(key);
-            }
-        }
-
-        /* check ready keys */
-        int readyKeys = select(timeout);
-        if (readyKeys > 0) {
-            Set<SelectionKey> keys = this.selector.selectedKeys();
-            Iterator<SelectionKey> iter = keys.iterator();
-            while (iter.hasNext()) {
-                SelectionKey key = iter.next();
-                iter.remove();
-
-                Transmissions transmissions = transmissions(key);
-                SocketChannel channel = channel(key);
-                try {
-                    /*
-                     * complete any connections that have finished their handshake
-                     */
-                    if (key.isConnectable()) {
-                        channel.finishConnect();
-                        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
-                        this.connected.add(transmissions.id);
-                    }
-
-                    /* read from any connections that have readable data */
-                    if (key.isReadable()) {
-                        if (!transmissions.hasReceive())
-                            transmissions.receive = new NetworkReceive(transmissions.id);
-                        transmissions.receive.readFrom(channel);
-                        if (transmissions.receive.complete()) {
-                            transmissions.receive.payload().rewind();
-                            this.completedReceives.add(transmissions.receive);
-                            transmissions.clearReceive();
-                        }
-                    }
-
-                    /*
-                     * write to any sockets that have space in their buffer and for which we have data
-                     */
-                    if (key.isWritable()) {
-                        transmissions.send.writeTo(channel);
-                        if (transmissions.send.remaining() <= 0) {
-                            this.completedSends.add(transmissions.send);
-                            transmissions.clearSend();
-                            key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-                        }
-                    }
-
-                    /* cancel any defunct sockets */
-                    if (!key.isValid())
-                        close(key);
-                } catch (IOException e) {
-                    e.printStackTrace();
-                    close(key);
-                }
-            }
-        }
-    }
-
-    @Override
-    public List<NetworkSend> completedSends() {
-        return this.completedSends;
-    }
-
-    @Override
-    public List<NetworkReceive> completedReceives() {
-        return this.completedReceives;
-    }
-
-    @Override
-    public List<Integer> disconnected() {
-        return this.disconnected;
-    }
-
-    @Override
-    public List<Integer> connected() {
-        return this.connected;
-    }
-
-    /**
-     * Clear the results from the prior poll
-     */
-    private void clear() {
-        this.completedSends.clear();
-        this.completedReceives.clear();
-        this.connected.clear();
-        this.disconnected.clear();
-    }
-
-    /**
-     * Check for data, waiting up to the given timeout.
-     * 
-     * @param ms Length of time to wait, in milliseconds. If negative, wait indefinitely.
-     * @return The number of keys ready
-     * @throws IOException
-     */
-    private int select(long ms) throws IOException {
-        if (ms == 0L)
-            return this.selector.selectNow();
-        else if (ms < 0L)
-            return this.selector.select();
-        else
-            return this.selector.select(ms);
-    }
-
-    /**
-     * Begin closing this connection
-     */
-    private void close(SelectionKey key) throws IOException {
-        SocketChannel channel = channel(key);
-        Transmissions trans = transmissions(key);
-        if (trans != null)
-            this.disconnected.add(trans.id);
-        key.attach(null);
-        key.cancel();
-        channel.socket().close();
-        channel.close();
-    }
-
-    /**
-     * Get the selection key associated with this numeric id
-     */
-    private SelectionKey keyForId(int id) {
-        SelectionKey key = this.keys.get(id);
-        if (key == null)
-            throw new IllegalStateException("Attempt to write to socket for which there is no open connection.");
-        return key;
-    }
-
-    /**
-     * Get the transmissions for the given connection
-     */
-    private Transmissions transmissions(SelectionKey key) {
-        return (Transmissions) key.attachment();
-    }
-
-    /**
-     * Get the socket channel associated with this selection key
-     */
-    private SocketChannel channel(SelectionKey key) {
-        return (SocketChannel) key.channel();
-    }
-
-    /**
-     * The id and in-progress send and receive associated with a connection
-     */
-    private static class Transmissions {
-        public int id;
-        public NetworkSend send;
-        public NetworkReceive receive;
-
-        public Transmissions(int id) {
-            this.id = id;
-        }
-
-        public boolean hasSend() {
-            return this.send != null;
-        }
-
-        public void clearSend() {
-            this.send = null;
-        }
-
-        public boolean hasReceive() {
-            return this.receive != null;
-        }
-
-        public void clearReceive() {
-            this.receive = null;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/network/Send.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/Send.java b/clients/src/main/java/kafka/common/network/Send.java
deleted file mode 100644
index e7ef68a..0000000
--- a/clients/src/main/java/kafka/common/network/Send.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package kafka.common.network;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.GatheringByteChannel;
-
-/**
- * This interface models the in-progress sending of data to a destination identified by an integer id.
- */
-public interface Send {
-
-    /**
-     * The numeric id for the destination of this send
-     */
-    public int destination();
-
-    /**
-     * The number of bytes remaining to send
-     */
-    public int remaining();
-
-    /**
-     * Is this send complete?
-     */
-    public boolean complete();
-
-    /**
-     * An optional method to turn this send into an array of ByteBuffers if possible (otherwise returns null)
-     */
-    public ByteBuffer[] reify();
-
-    /**
-     * Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send
-     * to be completely written
-     * @param channel The channel to write to
-     * @return The number of bytes written
-     * @throws IOException If the write fails
-     */
-    public long writeTo(GatheringByteChannel channel) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/kafka/common/protocol/ApiKeys.java
deleted file mode 100644
index 1e2f8bb..0000000
--- a/clients/src/main/java/kafka/common/protocol/ApiKeys.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package kafka.common.protocol;
-
-/**
- * Identifiers for all the Kafka APIs
- */
-public enum ApiKeys {
-    PRODUCE(0, "produce"),
-    FETCH(1, "fetch"),
-    LIST_OFFSETS(2, "list_offsets"),
-    METADATA(3, "metadata"),
-    LEADER_AND_ISR(4, "leader_and_isr"),
-    STOP_REPLICA(5, "stop_replica"),
-    OFFSET_COMMIT(6, "offset_commit"),
-    OFFSET_FETCH(7, "offset_fetch");
-
-    public static int MAX_API_KEY = 0;
-
-    static {
-        for (ApiKeys key : ApiKeys.values()) {
-            MAX_API_KEY = Math.max(MAX_API_KEY, key.id);
-        }
-    }
-
-    /** the perminant and immutable id of an API--this can't change ever */
-    public final short id;
-
-    /** an english description of the api--this is for debugging and can change */
-    public final String name;
-
-    private ApiKeys(int id, String name) {
-        this.id = (short) id;
-        this.name = name;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/Errors.java b/clients/src/main/java/kafka/common/protocol/Errors.java
deleted file mode 100644
index 402a6c0..0000000
--- a/clients/src/main/java/kafka/common/protocol/Errors.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package kafka.common.protocol;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import kafka.common.errors.ApiException;
-import kafka.common.errors.CorruptRecordException;
-import kafka.common.errors.LeaderNotAvailableException;
-import kafka.common.errors.RecordTooLargeException;
-import kafka.common.errors.NetworkException;
-import kafka.common.errors.NotLeaderForPartitionException;
-import kafka.common.errors.OffsetMetadataTooLarge;
-import kafka.common.errors.OffsetOutOfRangeException;
-import kafka.common.errors.TimeoutException;
-import kafka.common.errors.UnknownServerException;
-import kafka.common.errors.UnknownTopicOrPartitionException;
-
-/**
- * This class contains all the client-server errors--those errors that must be sent from the server to the client. These
- * are thus part of the protocol. The names can be changed but the error code cannot.
- * 
- * Do not add exceptions that occur only on the client or only on the server here.
- */
-public enum Errors {
-    UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")),
-    NONE(0, null),
-    OFFSET_OUT_OF_RANGE(1,
-                        new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
-    CORRUPT_MESSAGE(2,
-                    new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
-    UNKNOWN_TOPIC_OR_PARTITION(3, new UnknownTopicOrPartitionException("This server does not host this topic-partition.")),
-    LEADER_NOT_AVAILABLE(5,
-                         new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
-    NOT_LEADER_FOR_PARTITION(6, new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
-    REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")),
-    MESSAGE_TOO_LARGE(10,
-                      new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
-    OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
-    NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received."));
-
-    private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
-    private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();
-    static {
-        for (Errors error : Errors.values()) {
-            codeToError.put(error.code(), error);
-            if (error.exception != null)
-                classToError.put(error.exception.getClass(), error);
-        }
-
-    }
-
-    private final short code;
-    private final ApiException exception;
-
-    private Errors(int code, ApiException exception) {
-        this.code = (short) code;
-        this.exception = exception;
-    }
-
-    /**
-     * An instance of the exception
-     */
-    public ApiException exception() {
-        return this.exception;
-    }
-
-    /**
-     * The error code for the exception
-     */
-    public short code() {
-        return this.code;
-    }
-
-    /**
-     * Throw the exception corresponding to this error if there is one
-     */
-    public void maybeThrow() {
-        if (exception != null)
-            throw this.exception;
-    }
-
-    /**
-     * Throw the exception if there is one
-     */
-    public static Errors forCode(short code) {
-        Errors error = codeToError.get(code);
-        return error == null ? UNKNOWN : error;
-    }
-
-    /**
-     * Return the error instance associated with this exception (or UKNOWN if there is none)
-     */
-    public static Errors forException(Throwable t) {
-        Errors error = classToError.get(t.getClass());
-        return error == null ? UNKNOWN : error;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/protocol/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/kafka/common/protocol/ProtoUtils.java
deleted file mode 100644
index 576c24d..0000000
--- a/clients/src/main/java/kafka/common/protocol/ProtoUtils.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package kafka.common.protocol;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import kafka.common.Cluster;
-import kafka.common.Node;
-import kafka.common.PartitionInfo;
-import kafka.common.protocol.types.Schema;
-import kafka.common.protocol.types.Struct;
-
-public class ProtoUtils {
-
-    private static Schema schemaFor(Schema[][] schemas, int apiKey, int version) {
-        if (apiKey < 0 || apiKey > schemas.length)
-            throw new IllegalArgumentException("Invalid api key: " + apiKey);
-        Schema[] versions = schemas[apiKey];
-        if (version < 0 || version > versions.length)
-            throw new IllegalArgumentException("Invalid version for API key " + apiKey + ": " + version);
-        return versions[version];
-    }
-
-    public static short latestVersion(int apiKey) {
-        if (apiKey < 0 || apiKey >= Protocol.CURR_VERSION.length)
-            throw new IllegalArgumentException("Invalid api key: " + apiKey);
-        return Protocol.CURR_VERSION[apiKey];
-    }
-
-    public static Schema requestSchema(int apiKey, int version) {
-        return schemaFor(Protocol.REQUESTS, apiKey, version);
-    }
-
-    public static Schema currentRequestSchema(int apiKey) {
-        return requestSchema(apiKey, latestVersion(apiKey));
-    }
-
-    public static Schema responseSchema(int apiKey, int version) {
-        return schemaFor(Protocol.RESPONSES, apiKey, version);
-    }
-
-    public static Schema currentResponseSchema(int apiKey) {
-        return schemaFor(Protocol.RESPONSES, apiKey, latestVersion(apiKey));
-    }
-
-    public static Struct parseRequest(int apiKey, int version, ByteBuffer buffer) {
-        return (Struct) requestSchema(apiKey, version).read(buffer);
-    }
-
-    public static Struct parseResponse(int apiKey, ByteBuffer buffer) {
-        return (Struct) currentResponseSchema(apiKey).read(buffer);
-    }
-
-    public static Cluster parseMetadataResponse(Struct response) {
-        Map<Integer, Node> brokers = new HashMap<Integer, Node>();
-        Object[] brokerStructs = (Object[]) response.get("brokers");
-        for (int i = 0; i < brokerStructs.length; i++) {
-            Struct broker = (Struct) brokerStructs[i];
-            int nodeId = (Integer) broker.get("node_id");
-            String host = (String) broker.get("host");
-            int port = (Integer) broker.get("port");
-            brokers.put(nodeId, new Node(nodeId, host, port));
-        }
-        List<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
-        Object[] topicInfos = (Object[]) response.get("topic_metadata");
-        for (int i = 0; i < topicInfos.length; i++) {
-            Struct topicInfo = (Struct) topicInfos[i];
-            short topicError = topicInfo.getShort("topic_error_code");
-            if (topicError == Errors.NONE.code()) {
-                String topic = topicInfo.getString("topic");
-                Object[] partitionInfos = (Object[]) topicInfo.get("partition_metadata");
-                for (int j = 0; j < partitionInfos.length; j++) {
-                    Struct partitionInfo = (Struct) partitionInfos[j];
-                    short partError = partitionInfo.getShort("partition_error_code");
-                    if (partError == Errors.NONE.code()) {
-                        int partition = partitionInfo.getInt("partition_id");
-                        int leader = partitionInfo.getInt("leader");
-                        Node leaderNode = leader == -1 ? null : brokers.get(leader);
-                        Object[] replicas = (Object[]) partitionInfo.get("replicas");
-                        Node[] replicaNodes = new Node[replicas.length];
-                        for (int k = 0; k < replicas.length; k++)
-                            replicaNodes[k] = brokers.get(replicas[k]);
-                        Object[] isr = (Object[]) partitionInfo.get("isr");
-                        Node[] isrNodes = new Node[isr.length];
-                        for (int k = 0; k < isr.length; k++)
-                            isrNodes[k] = brokers.get(isr[k]);
-                        partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes));
-                    }
-                }
-            }
-        }
-        return new Cluster(brokers.values(), partitions);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/Protocol.java b/clients/src/main/java/kafka/common/protocol/Protocol.java
deleted file mode 100644
index 49b60aa..0000000
--- a/clients/src/main/java/kafka/common/protocol/Protocol.java
+++ /dev/null
@@ -1,130 +0,0 @@
-package kafka.common.protocol;
-
-import static kafka.common.protocol.types.Type.BYTES;
-import static kafka.common.protocol.types.Type.INT16;
-import static kafka.common.protocol.types.Type.INT32;
-import static kafka.common.protocol.types.Type.INT64;
-import static kafka.common.protocol.types.Type.STRING;
-import kafka.common.protocol.types.ArrayOf;
-import kafka.common.protocol.types.Field;
-import kafka.common.protocol.types.Schema;
-
-public class Protocol {
-
-    public static Schema REQUEST_HEADER = new Schema(new Field("api_key", INT16, "The id of the request type."),
-                                                     new Field("api_version", INT16, "The version of the API."),
-                                                     new Field("correlation_id",
-                                                               INT32,
-                                                               "A user-supplied integer value that will be passed back with the response"),
-                                                     new Field("client_id",
-                                                               STRING,
-                                                               "A user specified identifier for the client making the request."));
-
-    public static Schema RESPONSE_HEADER = new Schema(new Field("correlation_id",
-                                                                INT32,
-                                                                "The user-supplied value passed in with the request"));
-
-    /* Metadata api */
-
-    public static Schema METADATA_REQUEST_V0 = new Schema(new Field("topics",
-                                                                    new ArrayOf(STRING),
-                                                                    "An array of topics to fetch metadata for. If no topics are specified fetch metadtata for all topics."));
-
-    public static Schema BROKER = new Schema(new Field("node_id", INT32, "The broker id."),
-                                             new Field("host", STRING, "The hostname of the broker."),
-                                             new Field("port", INT32, "The port on which the broker accepts requests."));
-
-    public static Schema PARTITION_METADATA_V0 = new Schema(new Field("partition_error_code",
-                                                                      INT16,
-                                                                      "The error code for the partition, if any."),
-                                                            new Field("partition_id", INT32, "The id of the partition."),
-                                                            new Field("leader",
-                                                                      INT32,
-                                                                      "The id of the broker acting as leader for this partition."),
-                                                            new Field("replicas",
-                                                                      new ArrayOf(INT32),
-                                                                      "The set of all nodes that host this partition."),
-                                                            new Field("isr",
-                                                                      new ArrayOf(INT32),
-                                                                      "The set of nodes that are in sync with the leader for this partition."));
-
-    public static Schema TOPIC_METADATA_V0 = new Schema(new Field("topic_error_code", INT16, "The error code for the given topic."),
-                                                        new Field("topic", STRING, "The name of the topic"),
-                                                        new Field("partition_metadata",
-                                                                  new ArrayOf(PARTITION_METADATA_V0),
-                                                                  "Metadata for each partition of the topic."));
-
-    public static Schema METADATA_RESPONSE_V0 = new Schema(new Field("brokers",
-                                                                     new ArrayOf(BROKER),
-                                                                     "Host and port information for all brokers."),
-                                                           new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V0)));
-
-    public static Schema[] METADATA_REQUEST = new Schema[] { METADATA_REQUEST_V0 };
-    public static Schema[] METADATA_RESPONSE = new Schema[] { METADATA_RESPONSE_V0 };
-
-    /* Produce api */
-
-    public static Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", STRING),
-                                                            new Field("data", new ArrayOf(new Schema(new Field("partition", INT32),
-                                                                                                     new Field("record_set", BYTES)))));
-
-    public static Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks",
-                                                                   INT16,
-                                                                   "The number of nodes that should replicate the produce before returning. -1 indicates the full ISR."),
-                                                         new Field("timeout", INT32, "The time to await a response in ms."),
-                                                         new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
-
-    public static Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses",
-                                                                    new ArrayOf(new Schema(new Field("topic", STRING),
-                                                                                           new Field("partition_responses",
-                                                                                                     new ArrayOf(new Schema(new Field("partition",
-                                                                                                                                      INT32),
-                                                                                                                            new Field("error_code",
-                                                                                                                                      INT16),
-                                                                                                                            new Field("base_offset",
-                                                                                                                                      INT64))))))));
-
-    public static Schema[] PRODUCE_REQUEST = new Schema[] { PRODUCE_REQUEST_V0 };
-    public static Schema[] PRODUCE_RESPONSE = new Schema[] { PRODUCE_RESPONSE_V0 };
-
-    /* an array of all requests and responses with all schema versions */
-    public static Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
-    public static Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
-
-    /* the latest version of each api */
-    public static short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1];
-
-    static {
-        REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST;
-        REQUESTS[ApiKeys.FETCH.id] = new Schema[] {};
-        REQUESTS[ApiKeys.LIST_OFFSETS.id] = new Schema[] {};
-        REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST;
-        REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
-        REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
-        REQUESTS[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {};
-        REQUESTS[ApiKeys.OFFSET_FETCH.id] = new Schema[] {};
-
-        RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
-        RESPONSES[ApiKeys.FETCH.id] = new Schema[] {};
-        RESPONSES[ApiKeys.LIST_OFFSETS.id] = new Schema[] {};
-        RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE;
-        RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
-        RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
-        RESPONSES[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {};
-        RESPONSES[ApiKeys.OFFSET_FETCH.id] = new Schema[] {};
-
-        /* set the maximum version of each api */
-        for (ApiKeys api : ApiKeys.values())
-            CURR_VERSION[api.id] = (short) (REQUESTS[api.id].length - 1);
-
-        /* sanity check that we have the same number of request and response versions for each api */
-        for (ApiKeys api : ApiKeys.values())
-            if (REQUESTS[api.id].length != RESPONSES[api.id].length)
-                throw new IllegalStateException(REQUESTS[api.id].length + " request versions for api "
-                                                + api.name
-                                                + " but "
-                                                + RESPONSES[api.id].length
-                                                + " response versions.");
-    }
-
-}