You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/01/28 20:16:31 UTC
[5/7] KAFKA-1227 New producer!
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/Metrics.java b/clients/src/main/java/kafka/common/metrics/Metrics.java
new file mode 100644
index 0000000..f2cb782
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/Metrics.java
@@ -0,0 +1,190 @@
+package kafka.common.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import kafka.common.utils.SystemTime;
+import kafka.common.utils.Time;
+import kafka.common.utils.Utils;
+
+/**
+ * A registry of sensors and metrics.
+ * <p>
+ * A metric is a named, numerical measurement. A sensor is a handle to record numerical measurements as they occur. Each
+ * Sensor has zero or more associated metrics. For example a Sensor might represent message sizes and we might associate
+ * with this sensor a metric for the average, maximum, or other statistics computed off the sequence of message sizes
+ * that are recorded by the sensor.
+ * <p>
+ * Usage looks something like this:
+ *
+ * <pre>
+ * // set up metrics:
+ * Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
+ * Sensor sensor = metrics.sensor("message-sizes");
+ * sensor.add("kafka.producer.message-sizes.avg", new Avg());
+ * sensor.add("kafka.producer.message-sizes.max", new Max());
+ *
+ * // as messages are sent we record the sizes
+ * sensor.record(messageSize);
+ * </pre>
+ */
+public class Metrics {
+
+ private final MetricConfig config;
+ private final ConcurrentMap<String, KafkaMetric> metrics;
+ private final ConcurrentMap<String, Sensor> sensors;
+ private final List<MetricsReporter> reporters;
+ private final Time time;
+
+ /**
+ * Create a metrics repository with no metric reporters and default configuration.
+ */
+ public Metrics() {
+ this(new MetricConfig());
+ }
+
+ /**
+ * Create a metrics repository with no metric reporters and default configuration.
+ */
+ public Metrics(Time time) {
+ this(new MetricConfig(), new ArrayList<MetricsReporter>(), time);
+ }
+
+ /**
+ * Create a metrics repository with no reporters and the given default config. This config will be used for any
+ * metric that doesn't override its own config.
+ * @param defaultConfig The default config to use for all metrics that don't override their config
+ */
+ public Metrics(MetricConfig defaultConfig) {
+ this(defaultConfig, new ArrayList<MetricsReporter>(0), new SystemTime());
+ }
+
+ /**
+ * Create a metrics repository with a default config and the given metric reporters
+ * @param defaultConfig The default config
+ * @param reporters The metrics reporters
+ * @param time The time instance to use with the metrics
+ */
+ public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time) {
+ this.config = defaultConfig;
+ this.sensors = new ConcurrentHashMap<String, Sensor>();
+ this.metrics = new ConcurrentHashMap<String, KafkaMetric>();
+ this.reporters = Utils.notNull(reporters);
+ this.time = time;
+ for (MetricsReporter reporter : reporters)
+ reporter.init(new ArrayList<KafkaMetric>());
+ }
+
+ /**
+ * Create a sensor with the given unique name and zero or more parent sensors. All parent sensors will receive every
+ * value recorded with this sensor.
+ * @param name The name of the sensor
+ * @param parents The parent sensors
+ * @return The sensor that is created
+ */
+ public Sensor sensor(String name, Sensor... parents) {
+ return sensor(name, null, parents);
+ }
+
+ /**
+ * Create a sensor with the given unique name and zero or more parent sensors. All parent sensors will receive every
+ * value recorded with this sensor.
+ * @param name The name of the sensor
+ * @param config A default configuration to use for this sensor for metrics that don't have their own config
+ * @param parents The parent sensors
+ * @return The sensor that is created
+ */
+ public synchronized Sensor sensor(String name, MetricConfig config, Sensor... parents) {
+ Sensor s = this.sensors.get(Utils.notNull(name));
+ if (s == null) {
+ s = new Sensor(this, name, parents, config == null ? this.config : config, time);
+ this.sensors.put(name, s);
+ }
+ return s;
+ }
+
+ /**
+ * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
+ * This is a way to expose existing values as metrics.
+ * @param name The name of the metric
+ * @param measurable The measurable that will be measured by this metric
+ */
+ public void addMetric(String name, Measurable measurable) {
+ addMetric(name, "", measurable);
+ }
+
+ /**
+ * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
+ * This is a way to expose existing values as metrics.
+ * @param name The name of the metric
+ * @param description A human-readable description to include in the metric
+ * @param measurable The measurable that will be measured by this metric
+ */
+ public void addMetric(String name, String description, Measurable measurable) {
+ addMetric(name, description, null, measurable);
+ }
+
+ /**
+ * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
+ * This is a way to expose existing values as metrics.
+ * @param name The name of the metric
+ * @param config The configuration to use when measuring this measurable
+ * @param measurable The measurable that will be measured by this metric
+ */
+ public void addMetric(String name, MetricConfig config, Measurable measurable) {
+ addMetric(name, "", config, measurable);
+ }
+
+ /**
+ * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
+ * This is a way to expose existing values as metrics.
+ * @param name The name of the metric
+ * @param description A human-readable description to include in the metric
+ * @param config The configuration to use when measuring this measurable
+ * @param measurable The measurable that will be measured by this metric
+ */
+ public synchronized void addMetric(String name, String description, MetricConfig config, Measurable measurable) {
+ KafkaMetric m = new KafkaMetric(new Object(),
+ Utils.notNull(name),
+ Utils.notNull(description),
+ Utils.notNull(measurable),
+ config == null ? this.config : config,
+ time);
+ registerMetric(m);
+ }
+
+ /**
+ * Add a MetricReporter
+ */
+ public synchronized void addReporter(MetricsReporter reporter) {
+ Utils.notNull(reporter).init(new ArrayList<KafkaMetric>(metrics.values()));
+ this.reporters.add(reporter);
+ }
+
+ synchronized void registerMetric(KafkaMetric metric) {
+ if (this.metrics.containsKey(metric.name()))
+ throw new IllegalArgumentException("A metric named '" + metric.name() + "' already exists, can't register another one.");
+ this.metrics.put(metric.name(), metric);
+ for (MetricsReporter reporter : reporters)
+ reporter.metricChange(metric);
+ }
+
+ /**
+ * Get all the metrics currently maintained indexed by metric name
+ */
+ public Map<String, KafkaMetric> metrics() {
+ return this.metrics;
+ }
+
+ /**
+ * Close this metrics repository.
+ */
+ public void close() {
+ for (MetricsReporter reporter : this.reporters)
+ reporter.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/MetricsReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/MetricsReporter.java b/clients/src/main/java/kafka/common/metrics/MetricsReporter.java
new file mode 100644
index 0000000..bf0b39e
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/MetricsReporter.java
@@ -0,0 +1,27 @@
+package kafka.common.metrics;
+
+import java.util.List;
+
+/**
+ * A plugin interface to allow things to listen as new metrics are created so they can be reported
+ */
+public interface MetricsReporter {
+
+ /**
+ * This is called when the reporter is first registered to initially register all existing metrics
+ * @param metrics All currently existing metrics
+ */
+ public void init(List<KafkaMetric> metrics);
+
+ /**
+ * This is called whenever a metric is updated or added
+ * @param metric
+ */
+ public void metricChange(KafkaMetric metric);
+
+ /**
+ * Called when the metrics repository is closed.
+ */
+ public void close();
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/Quota.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/Quota.java b/clients/src/main/java/kafka/common/metrics/Quota.java
new file mode 100644
index 0000000..6278246
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/Quota.java
@@ -0,0 +1,36 @@
+package kafka.common.metrics;
+
+/**
+ * An upper or lower bound for metrics
+ */
+public final class Quota {
+
+ private final boolean upper;
+ private final double bound;
+
+ public Quota(double bound, boolean upper) {
+ this.bound = bound;
+ this.upper = upper;
+ }
+
+ public static Quota lessThan(double upperBound) {
+ return new Quota(upperBound, true);
+ }
+
+ public static Quota moreThan(double lowerBound) {
+ return new Quota(lowerBound, false);
+ }
+
+ public boolean isUpperBound() {
+ return this.upper;
+ }
+
+ public double bound() {
+ return this.bound;
+ }
+
+ public boolean acceptable(double value) {
+ return (upper && value <= bound) || (!upper && value >= bound);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/QuotaViolationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/QuotaViolationException.java b/clients/src/main/java/kafka/common/metrics/QuotaViolationException.java
new file mode 100644
index 0000000..b9005cd
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/QuotaViolationException.java
@@ -0,0 +1,16 @@
+package kafka.common.metrics;
+
+import kafka.common.KafkaException;
+
+/**
+ * Thrown when a sensor records a value that causes a metric to go outside the bounds configured as its quota
+ */
+public class QuotaViolationException extends KafkaException {
+
+ private static final long serialVersionUID = 1L;
+
+ public QuotaViolationException(String m) {
+ super(m);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/Sensor.java b/clients/src/main/java/kafka/common/metrics/Sensor.java
new file mode 100644
index 0000000..9c11835
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/Sensor.java
@@ -0,0 +1,171 @@
+package kafka.common.metrics;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import kafka.common.metrics.CompoundStat.NamedMeasurable;
+import kafka.common.utils.Time;
+import kafka.common.utils.Utils;
+
+/**
+ * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on
+ * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set
+ * of metrics about request sizes such as the average or max.
+ */
+public final class Sensor {
+
+ private final Metrics registry;
+ private final String name;
+ private final Sensor[] parents;
+ private final List<Stat> stats;
+ private final List<KafkaMetric> metrics;
+ private final MetricConfig config;
+ private final Time time;
+
+ Sensor(Metrics registry, String name, Sensor[] parents, MetricConfig config, Time time) {
+ super();
+ this.registry = registry;
+ this.name = Utils.notNull(name);
+ this.parents = parents;
+ this.metrics = new ArrayList<KafkaMetric>();
+ this.stats = new ArrayList<Stat>();
+ this.config = config;
+ this.time = time;
+ checkForest(new HashSet<Sensor>());
+ }
+
+ /* Validate that this sensor doesn't end up referencing itself */
+ private void checkForest(Set<Sensor> sensors) {
+ if (!sensors.add(this))
+ throw new IllegalArgumentException("Circular dependency in sensors: " + name() + " is its own parent.");
+ for (int i = 0; i < parents.length; i++)
+ parents[i].checkForest(sensors);
+ }
+
+ /**
+ * The name this sensor is registered with. This name will be unique among all registered sensors.
+ */
+ public String name() {
+ return this.name;
+ }
+
+ /**
+ * Record an occurrence, this is just short-hand for {@link #record(double) record(1.0)}
+ */
+ public void record() {
+ record(1.0);
+ }
+
+ /**
+ * Record a value with this sensor
+ * @param value The value to record
+ * @throws QuotaViolationException if recording this value moves a metric beyond its configured maximum or minimum
+ * bound
+ */
+ public void record(double value) {
+ record(value, time.nanoseconds());
+ }
+
+ private void record(double value, long time) {
+ synchronized (this) {
+ // increment all the stats
+ for (int i = 0; i < this.stats.size(); i++)
+ this.stats.get(i).record(config, value, time);
+ checkQuotas(time);
+
+ }
+ for (int i = 0; i < parents.length; i++)
+ parents[i].record(value, time);
+ }
+
+ private void checkQuotas(long time) {
+ for (int i = 0; i < this.metrics.size(); i++) {
+ KafkaMetric metric = this.metrics.get(i);
+ MetricConfig config = metric.config();
+ if (config != null) {
+ Quota quota = config.quota();
+ if (quota != null)
+ if (!quota.acceptable(metric.value(time)))
+ throw new QuotaViolationException("Metric " + metric.name() + " is in violation of its quota of " + quota.bound());
+ }
+ }
+ }
+
+ /**
+ * Register a compound statistic with this sensor with no config override
+ */
+ public void add(CompoundStat stat) {
+ add(stat, null);
+ }
+
+ /**
+ * Register a compound statistic with this sensor which yields multiple measurable quantities (like a histogram)
+ * @param stat The stat to register
+ * @param config The configuration for this stat. If null then the stat will use the default configuration for this
+ * sensor.
+ */
+ public synchronized void add(CompoundStat stat, MetricConfig config) {
+ this.stats.add(Utils.notNull(stat));
+ for (NamedMeasurable m : stat.stats()) {
+ KafkaMetric metric = new KafkaMetric(this, m.name(), m.description(), m.stat(), config == null ? this.config : config, time);
+ this.registry.registerMetric(metric);
+ this.metrics.add(metric);
+ }
+ }
+
+ /**
+ * Add a metric with default configuration and no description. Equivalent to
+ * {@link Sensor#add(String, String, MeasurableStat, MetricConfig) add(name, "", stat, null)}
+ *
+ */
+ public void add(String name, MeasurableStat stat) {
+ add(name, stat, null);
+ }
+
+ /**
+ * Add a metric with default configuration. Equivalent to
+ * {@link Sensor#add(String, String, MeasurableStat, MetricConfig) add(name, description, stat, null)}
+ *
+ */
+ public void add(String name, String description, MeasurableStat stat) {
+ add(name, description, stat, null);
+ }
+
+ /**
+ * Add a metric to this sensor with no description. Equivalent to
+ * {@link Sensor#add(String, String, MeasurableStat, MetricConfig) add(name, "", stat, config)}
+ * @param name
+ * @param stat
+ * @param config
+ */
+ public void add(String name, MeasurableStat stat, MetricConfig config) {
+ add(name, "", stat, config);
+ }
+
+ /**
+ * Register a metric with this sensor
+ * @param name The name of the metric
+ * @param description A description used when reporting the value
+ * @param stat The statistic to keep
+ * @param config A special configuration for this metric. If null use the sensor default configuration.
+ */
+ public synchronized void add(String name, String description, MeasurableStat stat, MetricConfig config) {
+ KafkaMetric metric = new KafkaMetric(this,
+ Utils.notNull(name),
+ Utils.notNull(description),
+ Utils.notNull(stat),
+ config == null ? this.config : config,
+ time);
+ this.registry.registerMetric(metric);
+ this.metrics.add(metric);
+ this.stats.add(stat);
+ }
+
+ synchronized List<KafkaMetric> metrics() {
+ return Collections.unmodifiableList(this.metrics);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/Stat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/Stat.java b/clients/src/main/java/kafka/common/metrics/Stat.java
new file mode 100644
index 0000000..8844545
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/Stat.java
@@ -0,0 +1,16 @@
+package kafka.common.metrics;
+
+/**
+ * A Stat is a quanity such as average, max, etc that is computed off the stream of updates to a sensor
+ */
+public interface Stat {
+
+ /**
+ * Record the given value
+ * @param config The configuration to use for this metric
+ * @param value The value to record
+ * @param time The time this value occurred
+ */
+ public void record(MetricConfig config, double value, long time);
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/stats/Avg.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Avg.java b/clients/src/main/java/kafka/common/metrics/stats/Avg.java
new file mode 100644
index 0000000..b9d3d5d
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/stats/Avg.java
@@ -0,0 +1,33 @@
+package kafka.common.metrics.stats;
+
+import java.util.List;
+
+import kafka.common.metrics.MetricConfig;
+
+/**
+ * A {@link SampledStat} that maintains a simple average over its samples.
+ */
+public class Avg extends SampledStat {
+
+ public Avg() {
+ super(0.0);
+ }
+
+ @Override
+ protected void update(Sample sample, MetricConfig config, double value, long now) {
+ sample.value += value;
+ }
+
+ @Override
+ public double combine(List<Sample> samples, MetricConfig config, long now) {
+ double total = 0.0;
+ long count = 0;
+ for (int i = 0; i < samples.size(); i++) {
+ Sample s = samples.get(i);
+ total += s.value;
+ count += s.eventCount;
+ }
+ return total / count;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/stats/Count.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Count.java b/clients/src/main/java/kafka/common/metrics/stats/Count.java
new file mode 100644
index 0000000..3712e78
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/stats/Count.java
@@ -0,0 +1,29 @@
+package kafka.common.metrics.stats;
+
+import java.util.List;
+
+import kafka.common.metrics.MetricConfig;
+
+/**
+ * A {@link SampledStat} that maintains a simple count of what it has seen.
+ */
+public class Count extends SampledStat {
+
+ public Count() {
+ super(0);
+ }
+
+ @Override
+ protected void update(Sample sample, MetricConfig config, double value, long now) {
+ sample.value += 1.0;
+ }
+
+ @Override
+ public double combine(List<Sample> samples, MetricConfig config, long now) {
+ double total = 0.0;
+ for (int i = 0; i < samples.size(); i++)
+ total += samples.get(i).value;
+ return total;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/stats/Histogram.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Histogram.java b/clients/src/main/java/kafka/common/metrics/stats/Histogram.java
new file mode 100644
index 0000000..c59b585
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/stats/Histogram.java
@@ -0,0 +1,137 @@
+package kafka.common.metrics.stats;
+
+public class Histogram {
+
+ private final BinScheme binScheme;
+ private final float[] hist;
+ private double count;
+
+ public Histogram(BinScheme binScheme) {
+ this.hist = new float[binScheme.bins()];
+ this.count = 0.0f;
+ this.binScheme = binScheme;
+ }
+
+ public void record(double value) {
+ this.hist[binScheme.toBin(value)] += 1.0f;
+ this.count += 1.0f;
+ }
+
+ public double value(double quantile) {
+ if (count == 0L)
+ return Double.NaN;
+ float sum = 0.0f;
+ float quant = (float) quantile;
+ for (int i = 0; i < this.hist.length - 1; i++) {
+ sum += this.hist[i];
+ if (sum / count > quant)
+ return binScheme.fromBin(i);
+ }
+ return Float.POSITIVE_INFINITY;
+ }
+
+ public void clear() {
+ for (int i = 0; i < this.hist.length; i++)
+ this.hist[i] = 0.0f;
+ this.count = 0;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder b = new StringBuilder('{');
+ for (int i = 0; i < this.hist.length - 1; i++) {
+ b.append(String.format("%.10f", binScheme.fromBin(i)));
+ b.append(':');
+ b.append(String.format("%.0f", this.hist[i]));
+ b.append(',');
+ }
+ b.append(Float.POSITIVE_INFINITY);
+ b.append(':');
+ b.append(this.hist[this.hist.length - 1]);
+ b.append('}');
+ return b.toString();
+ }
+
+ public interface BinScheme {
+ public int bins();
+
+ public int toBin(double value);
+
+ public double fromBin(int bin);
+ }
+
+ public static class ConstantBinScheme implements BinScheme {
+ private final double min;
+ private final double max;
+ private final int bins;
+ private final double bucketWidth;
+
+ public ConstantBinScheme(int bins, double min, double max) {
+ if (bins < 2)
+ throw new IllegalArgumentException("Must have at least 2 bins.");
+ this.min = min;
+ this.max = max;
+ this.bins = bins;
+ this.bucketWidth = (max - min) / (bins - 2);
+ }
+
+ public int bins() {
+ return this.bins;
+ }
+
+ public double fromBin(int b) {
+ if (b == 0)
+ return Double.NEGATIVE_INFINITY;
+ else if (b == bins - 1)
+ return Double.POSITIVE_INFINITY;
+ else
+ return min + (b - 1) * bucketWidth;
+ }
+
+ public int toBin(double x) {
+ if (x < min)
+ return 0;
+ else if (x > max)
+ return bins - 1;
+ else
+ return (int) ((x - min) / bucketWidth) + 1;
+ }
+ }
+
+ public static class LinearBinScheme implements BinScheme {
+ private final int bins;
+ private final double max;
+ private final double scale;
+
+ public LinearBinScheme(int numBins, double max) {
+ this.bins = numBins;
+ this.max = max;
+ this.scale = max / (numBins * (numBins - 1) / 2);
+ }
+
+ public int bins() {
+ return this.bins;
+ }
+
+ public double fromBin(int b) {
+ if (b == this.bins - 1) {
+ return Float.POSITIVE_INFINITY;
+ } else {
+ double unscaled = (b * (b - 1.0)) / 2.0;
+ return unscaled * this.scale;
+ }
+ }
+
+ public int toBin(double x) {
+ if (x < 0.0d) {
+ throw new IllegalArgumentException("Values less than 0.0 not accepted.");
+ } else if (x > this.max) {
+ return this.bins - 1;
+ } else {
+ double scaled = x / this.scale;
+ return (int) (-0.5 + Math.sqrt(2.0 * scaled + 0.25));
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/stats/Max.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Max.java b/clients/src/main/java/kafka/common/metrics/stats/Max.java
new file mode 100644
index 0000000..e7bd1d2
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/stats/Max.java
@@ -0,0 +1,29 @@
+package kafka.common.metrics.stats;
+
+import java.util.List;
+
+import kafka.common.metrics.MetricConfig;
+
+/**
+ * A {@link SampledStat} that gives the max over its samples.
+ */
+public final class Max extends SampledStat {
+
+ public Max() {
+ super(Double.NEGATIVE_INFINITY);
+ }
+
+ @Override
+ protected void update(Sample sample, MetricConfig config, double value, long now) {
+ sample.value = Math.max(sample.value, value);
+ }
+
+ @Override
+ public double combine(List<Sample> samples, MetricConfig config, long now) {
+ double max = Double.NEGATIVE_INFINITY;
+ for (int i = 0; i < samples.size(); i++)
+ max = Math.max(max, samples.get(i).value);
+ return max;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/stats/Min.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Min.java b/clients/src/main/java/kafka/common/metrics/stats/Min.java
new file mode 100644
index 0000000..db0ab92
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/stats/Min.java
@@ -0,0 +1,29 @@
+package kafka.common.metrics.stats;
+
+import java.util.List;
+
+import kafka.common.metrics.MetricConfig;
+
+/**
+ * A {@link SampledStat} that gives the min over its samples.
+ */
+public class Min extends SampledStat {
+
+ public Min() {
+ super(Double.MIN_VALUE);
+ }
+
+ @Override
+ protected void update(Sample sample, MetricConfig config, double value, long now) {
+ sample.value = Math.min(sample.value, value);
+ }
+
+ @Override
+ public double combine(List<Sample> samples, MetricConfig config, long now) {
+ double max = Double.MAX_VALUE;
+ for (int i = 0; i < samples.size(); i++)
+ max = Math.min(max, samples.get(i).value);
+ return max;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/stats/Percentile.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Percentile.java b/clients/src/main/java/kafka/common/metrics/stats/Percentile.java
new file mode 100644
index 0000000..84320bb
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/stats/Percentile.java
@@ -0,0 +1,32 @@
+package kafka.common.metrics.stats;
+
+public class Percentile {
+
+ private final String name;
+ private final String description;
+ private final double percentile;
+
+ public Percentile(String name, double percentile) {
+ this(name, "", percentile);
+ }
+
+ public Percentile(String name, String description, double percentile) {
+ super();
+ this.name = name;
+ this.description = description;
+ this.percentile = percentile;
+ }
+
+ public String name() {
+ return this.name;
+ }
+
+ public String description() {
+ return this.description;
+ }
+
+ public double percentile() {
+ return this.percentile;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java b/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java
new file mode 100644
index 0000000..686c726
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java
@@ -0,0 +1,76 @@
+package kafka.common.metrics.stats;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import kafka.common.metrics.CompoundStat;
+import kafka.common.metrics.Measurable;
+import kafka.common.metrics.MetricConfig;
+import kafka.common.metrics.stats.Histogram.BinScheme;
+import kafka.common.metrics.stats.Histogram.ConstantBinScheme;
+import kafka.common.metrics.stats.Histogram.LinearBinScheme;
+
+/**
+ * A compound stat that reports one or more percentiles
+ */
+public class Percentiles implements CompoundStat {
+
+ public static enum BucketSizing {
+ CONSTANT, LINEAR
+ }
+
+ private final Percentile[] percentiles;
+ private Histogram current;
+ private Histogram shadow;
+ private long lastWindow;
+ private long eventCount;
+
+ public Percentiles(int sizeInBytes, double max, BucketSizing bucketing, Percentile... percentiles) {
+ this(sizeInBytes, 0.0, max, bucketing, percentiles);
+ }
+
+ public Percentiles(int sizeInBytes, double min, double max, BucketSizing bucketing, Percentile... percentiles) {
+ this.percentiles = percentiles;
+ BinScheme scheme = null;
+ if (bucketing == BucketSizing.CONSTANT) {
+ scheme = new ConstantBinScheme(sizeInBytes / 4, min, max);
+ } else if (bucketing == BucketSizing.LINEAR) {
+ if (min != 0.0d)
+ throw new IllegalArgumentException("Linear bucket sizing requires min to be 0.0.");
+ scheme = new LinearBinScheme(sizeInBytes / 4, max);
+ }
+ this.current = new Histogram(scheme);
+ this.shadow = new Histogram(scheme);
+ this.eventCount = 0L;
+ }
+
+ @Override
+ public List<NamedMeasurable> stats() {
+ List<NamedMeasurable> ms = new ArrayList<NamedMeasurable>(this.percentiles.length);
+ for (Percentile percentile : this.percentiles) {
+ final double pct = percentile.percentile();
+ ms.add(new NamedMeasurable(percentile.name(), percentile.description(), new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return current.value(pct / 100.0);
+ }
+ }));
+ }
+ return ms;
+ }
+
+ @Override
+ public void record(MetricConfig config, double value, long time) {
+ long ellapsed = time - this.lastWindow;
+ if (ellapsed > config.timeWindowNs() / 2 || this.eventCount > config.eventWindow() / 2)
+ this.shadow.clear();
+ if (ellapsed > config.timeWindowNs() || this.eventCount > config.eventWindow()) {
+ Histogram tmp = this.current;
+ this.current = this.shadow;
+ this.shadow = tmp;
+ this.shadow.clear();
+ }
+ this.current.record(value);
+ this.shadow.record(value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/stats/Rate.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/kafka/common/metrics/stats/Rate.java
new file mode 100644
index 0000000..3f24a92
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/stats/Rate.java
@@ -0,0 +1,85 @@
+package kafka.common.metrics.stats;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import kafka.common.metrics.MeasurableStat;
+import kafka.common.metrics.MetricConfig;
+
+/**
+ * The rate of the given quanitity. By default this is the total observed over a set of samples from a sampled statistic
+ * divided by the ellapsed time over the sample windows. Alternative {@link SampledStat} implementations can be
+ * provided, however, to record the rate of occurences (e.g. the count of values measured over the time interval) or
+ * other such values.
+ */
+public class Rate implements MeasurableStat {
+
+ private final TimeUnit unit;
+ private final SampledStat stat;
+
+ public Rate(TimeUnit unit) {
+ this(unit, new SampledTotal());
+ }
+
+ public Rate(TimeUnit unit, SampledStat stat) {
+ this.stat = stat;
+ this.unit = unit;
+ }
+
+ public String unitName() {
+ return unit.name().substring(0, unit.name().length() - 2).toLowerCase();
+ }
+
+ @Override
+ public void record(MetricConfig config, double value, long time) {
+ this.stat.record(config, value, time);
+ }
+
+ @Override
+ public double measure(MetricConfig config, long now) {
+ double ellapsed = convert(now - stat.oldest().lastWindow);
+ return stat.measure(config, now) / ellapsed;
+ }
+
+ private double convert(long time) {
+ switch (unit) {
+ case NANOSECONDS:
+ return time;
+ case MICROSECONDS:
+ return time / 1000.0;
+ case MILLISECONDS:
+ return time / (1000.0 * 1000.0);
+ case SECONDS:
+ return time / (1000.0 * 1000.0 * 1000.0);
+ case MINUTES:
+ return time / (60.0 * 1000.0 * 1000.0 * 1000.0);
+ case HOURS:
+ return time / (60.0 * 60.0 * 1000.0 * 1000.0 * 1000.0);
+ case DAYS:
+ return time / (24.0 * 60.0 * 60.0 * 1000.0 * 1000.0 * 1000.0);
+ default:
+ throw new IllegalStateException("Unknown unit: " + unit);
+ }
+ }
+
+ public static class SampledTotal extends SampledStat {
+
+ public SampledTotal() {
+ super(0.0d);
+ }
+
+ @Override
+ protected void update(Sample sample, MetricConfig config, double value, long now) {
+ sample.value += value;
+ }
+
+ @Override
+ public double combine(List<Sample> samples, MetricConfig config, long now) {
+ double total = 0.0;
+ for (int i = 0; i < samples.size(); i++)
+ total += samples.get(i).value;
+ return total;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java b/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java
new file mode 100644
index 0000000..6f820fa
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java
@@ -0,0 +1,106 @@
+package kafka.common.metrics.stats;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import kafka.common.metrics.MeasurableStat;
+import kafka.common.metrics.MetricConfig;
+
+/**
+ * A SampledStat records a single scalar value measured over one or more samples. Each sample is recorded over a
+ * configurable window. The window can be defined by number of events or ellapsed time (or both, if both are given the
+ * window is complete when <i>either</i> the event count or ellapsed time criterion is met).
+ * <p>
+ * All the samples are combined to produce the measurement. When a window is complete the oldest sample is cleared and
+ * recycled to begin recording the next sample.
+ *
+ * Subclasses of this class define different statistics measured using this basic pattern.
+ */
+public abstract class SampledStat implements MeasurableStat {
+
+ private double initialValue;
+ private int current = 0;
+ private List<Sample> samples;
+
+ public SampledStat(double initialValue) {
+ this.initialValue = initialValue;
+ this.samples = new ArrayList<Sample>(2);
+ }
+
+ @Override
+ public void record(MetricConfig config, double value, long now) {
+ Sample sample = current(now);
+ if (sample.isComplete(now, config))
+ sample = advance(config, now);
+ update(sample, config, value, now);
+ sample.eventCount += 1;
+ }
+
+ private Sample advance(MetricConfig config, long now) {
+ this.current = (this.current + 1) % config.samples();
+ if (this.current >= samples.size()) {
+ Sample sample = new Sample(this.initialValue, now);
+ this.samples.add(sample);
+ return sample;
+ } else {
+ Sample sample = current(now);
+ sample.reset(now);
+ return sample;
+ }
+ }
+
+ @Override
+ public double measure(MetricConfig config, long now) {
+ timeoutObsoleteSamples(config, now);
+ return combine(this.samples, config, now);
+ }
+
+ public Sample current(long now) {
+ if (samples.size() == 0)
+ this.samples.add(new Sample(initialValue, now));
+ return this.samples.get(this.current);
+ }
+
+ public Sample oldest() {
+ return this.samples.get((this.current + 1) % this.samples.size());
+ }
+
+ protected abstract void update(Sample sample, MetricConfig config, double value, long now);
+
+ public abstract double combine(List<Sample> samples, MetricConfig config, long now);
+
+ /* Timeout any windows that have expired in the absense of any events */
+ private void timeoutObsoleteSamples(MetricConfig config, long now) {
+ for (int i = 0; i < samples.size(); i++) {
+ int idx = (this.current + i) % samples.size();
+ Sample sample = this.samples.get(idx);
+ if (now - sample.lastWindow >= (i + 1) * config.timeWindowNs())
+ sample.reset(now);
+ }
+ }
+
+ protected static class Sample {
+ public double initialValue;
+ public long eventCount;
+ public long lastWindow;
+ public double value;
+
+ public Sample(double initialValue, long now) {
+ this.initialValue = initialValue;
+ this.eventCount = 0;
+ this.lastWindow = now;
+ this.value = initialValue;
+ }
+
+ public void reset(long now) {
+ this.eventCount = 0;
+ this.lastWindow = now;
+ this.value = initialValue;
+ }
+
+ public boolean isComplete(long now, MetricConfig config) {
+ return now - lastWindow >= config.timeWindowNs() || eventCount >= config.eventWindow();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/metrics/stats/Total.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Total.java b/clients/src/main/java/kafka/common/metrics/stats/Total.java
new file mode 100644
index 0000000..c87b1ba
--- /dev/null
+++ b/clients/src/main/java/kafka/common/metrics/stats/Total.java
@@ -0,0 +1,31 @@
+package kafka.common.metrics.stats;
+
+import kafka.common.metrics.MeasurableStat;
+import kafka.common.metrics.MetricConfig;
+
+/**
+ * An un-windowed cumulative total maintained over all time.
+ */
+public class Total implements MeasurableStat {
+
+ private double total;
+
+ public Total() {
+ this.total = 0.0;
+ }
+
+ public Total(double value) {
+ this.total = value;
+ }
+
+ @Override
+ public void record(MetricConfig config, double value, long time) {
+ this.total += value;
+ }
+
+ @Override
+ public double measure(MetricConfig config, long now) {
+ return this.total;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/network/ByteBufferReceive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/ByteBufferReceive.java b/clients/src/main/java/kafka/common/network/ByteBufferReceive.java
new file mode 100644
index 0000000..cb1aaae
--- /dev/null
+++ b/clients/src/main/java/kafka/common/network/ByteBufferReceive.java
@@ -0,0 +1,43 @@
+package kafka.common.network;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ScatteringByteChannel;
+
+/**
+ * A receive backed by an array of ByteBuffers
+ */
+public class ByteBufferReceive implements Receive {
+
+ private final int source;
+ private final ByteBuffer[] buffers;
+ private int remaining;
+
+ public ByteBufferReceive(int source, ByteBuffer... buffers) {
+ super();
+ this.source = source;
+ this.buffers = buffers;
+ for (int i = 0; i < buffers.length; i++)
+ remaining += buffers[i].remaining();
+ }
+
+ @Override
+ public int source() {
+ return source;
+ }
+
+ @Override
+ public boolean complete() {
+ return remaining > 0;
+ }
+
+ @Override
+ public long readFrom(ScatteringByteChannel channel) throws IOException {
+ return channel.read(buffers);
+ }
+
+ public ByteBuffer[] reify() {
+ return buffers;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/network/ByteBufferSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/ByteBufferSend.java b/clients/src/main/java/kafka/common/network/ByteBufferSend.java
new file mode 100644
index 0000000..43bf963
--- /dev/null
+++ b/clients/src/main/java/kafka/common/network/ByteBufferSend.java
@@ -0,0 +1,54 @@
+package kafka.common.network;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+
+/**
+ * A send backed by an array of byte buffers
+ */
+public class ByteBufferSend implements Send {
+
+ private final int destination;
+ protected final ByteBuffer[] buffers;
+ private int remaining;
+
+ public ByteBufferSend(int destination, ByteBuffer... buffers) {
+ super();
+ this.destination = destination;
+ this.buffers = buffers;
+ for (int i = 0; i < buffers.length; i++)
+ remaining += buffers[i].remaining();
+ }
+
+ @Override
+ public int destination() {
+ return destination;
+ }
+
+ @Override
+ public boolean complete() {
+ return remaining > 0;
+ }
+
+ @Override
+ public ByteBuffer[] reify() {
+ return this.buffers;
+ }
+
+ @Override
+ public int remaining() {
+ return this.remaining;
+ }
+
+ @Override
+ public long writeTo(GatheringByteChannel channel) throws IOException {
+ long written = channel.write(buffers);
+ if (written < 0)
+ throw new EOFException("This shouldn't happen.");
+ remaining -= written;
+ return written;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/network/NetworkReceive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/NetworkReceive.java b/clients/src/main/java/kafka/common/network/NetworkReceive.java
new file mode 100644
index 0000000..68ae48e
--- /dev/null
+++ b/clients/src/main/java/kafka/common/network/NetworkReceive.java
@@ -0,0 +1,74 @@
+package kafka.common.network;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ScatteringByteChannel;
+
+/**
+ * A size delimited Receive that consists of a 4 byte network-ordered size N followed by N bytes of content
+ */
+public class NetworkReceive implements Receive {
+
+ private final int source;
+ private final ByteBuffer size;
+ private ByteBuffer buffer;
+
+ public NetworkReceive(int source, ByteBuffer buffer) {
+ this.source = source;
+ this.buffer = buffer;
+ this.size = null;
+ }
+
+ public NetworkReceive(int source) {
+ this.source = source;
+ this.size = ByteBuffer.allocate(4);
+ this.buffer = null;
+ }
+
+ @Override
+ public int source() {
+ return source;
+ }
+
+ @Override
+ public boolean complete() {
+ return !size.hasRemaining() && !buffer.hasRemaining();
+ }
+
+ @Override
+ public ByteBuffer[] reify() {
+ return new ByteBuffer[] { this.buffer };
+ }
+
+ @Override
+ public long readFrom(ScatteringByteChannel channel) throws IOException {
+ int read = 0;
+ if (size.hasRemaining()) {
+ int bytesRead = channel.read(size);
+ if (bytesRead < 0)
+ throw new EOFException();
+ read += bytesRead;
+ if (!size.hasRemaining()) {
+ size.rewind();
+ int requestSize = size.getInt();
+ if (requestSize < 0)
+ throw new IllegalStateException("Invalid request (size = " + requestSize + ")");
+ this.buffer = ByteBuffer.allocate(requestSize);
+ }
+ }
+ if (buffer != null) {
+ int bytesRead = channel.read(buffer);
+ if (bytesRead < 0)
+ throw new EOFException();
+ read += bytesRead;
+ }
+
+ return read;
+ }
+
+ public ByteBuffer payload() {
+ return this.buffer;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/network/NetworkSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/NetworkSend.java b/clients/src/main/java/kafka/common/network/NetworkSend.java
new file mode 100644
index 0000000..4e4ac98
--- /dev/null
+++ b/clients/src/main/java/kafka/common/network/NetworkSend.java
@@ -0,0 +1,26 @@
+package kafka.common.network;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A size delimited Send that consists of a 4 byte network-ordered size N followed by N bytes of content
+ */
+public class NetworkSend extends ByteBufferSend {
+
+ public NetworkSend(int destination, ByteBuffer... buffers) {
+ super(destination, sizeDelimit(buffers));
+ }
+
+ private static ByteBuffer[] sizeDelimit(ByteBuffer[] buffers) {
+ int size = 0;
+ for (int i = 0; i < buffers.length; i++)
+ size += buffers[i].remaining();
+ ByteBuffer[] delimited = new ByteBuffer[buffers.length + 1];
+ delimited[0] = ByteBuffer.allocate(4);
+ delimited[0].putInt(size);
+ delimited[0].rewind();
+ System.arraycopy(buffers, 0, delimited, 1, buffers.length);
+ return delimited;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/network/Receive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/Receive.java b/clients/src/main/java/kafka/common/network/Receive.java
new file mode 100644
index 0000000..40ee942
--- /dev/null
+++ b/clients/src/main/java/kafka/common/network/Receive.java
@@ -0,0 +1,35 @@
+package kafka.common.network;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ScatteringByteChannel;
+
+/**
+ * This interface models the in-progress reading of data from a channel to a source identified by an integer id
+ */
+public interface Receive {
+
+ /**
+ * The numeric id of the source from which we are receiving data.
+ */
+ public int source();
+
+ /**
+ * Are we done receiving data?
+ */
+ public boolean complete();
+
+ /**
+ * Turn this receive into ByteBuffer instances, if possible (otherwise returns null).
+ */
+ public ByteBuffer[] reify();
+
+ /**
+ * Read bytes into this receive from the given channel
+ * @param channel The channel to read from
+ * @return The number of bytes read
+ * @throws IOException If the reading fails
+ */
+ public long readFrom(ScatteringByteChannel channel) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/network/Selectable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/Selectable.java b/clients/src/main/java/kafka/common/network/Selectable.java
new file mode 100644
index 0000000..794fc60
--- /dev/null
+++ b/clients/src/main/java/kafka/common/network/Selectable.java
@@ -0,0 +1,68 @@
+package kafka.common.network;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+/**
+ * An interface for asynchronous, multi-channel network I/O
+ */
+public interface Selectable {
+
+ /**
+ * Begin establishing a socket connection to the given address identified by the given address
+ * @param id The id for this connection
+ * @param address The address to connect to
+ * @param sendBufferSize The send buffer for the socket
+ * @param receiveBufferSize The receive buffer for the socket
+ * @throws IOException If we cannot begin connecting
+ */
+ public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException;
+
+ /**
+ * Begin disconnecting the connection identified by the given id
+ */
+ public void disconnect(int id);
+
+ /**
+ * Wakeup this selector if it is blocked on I/O
+ */
+ public void wakeup();
+
+ /**
+ * Close this selector
+ */
+ public void close();
+
+ /**
+ * Initiate any sends provided, and make progress on any other I/O operations in-flight (connections,
+ * disconnections, existing sends, and receives)
+ * @param timeout The amount of time to block if there is nothing to do
+ * @param sends The new sends to initiate
+ * @throws IOException
+ */
+ public void poll(long timeout, List<NetworkSend> sends) throws IOException;
+
+ /**
+ * The list of sends that completed on the last {@link #poll(long, List<NetworkSend>) poll()} call.
+ */
+ public List<NetworkSend> completedSends();
+
+ /**
+ * The list of receives that completed on the last {@link #poll(long, List<NetworkSend>) poll()} call.
+ */
+ public List<NetworkReceive> completedReceives();
+
+ /**
+ * The list of connections that finished disconnecting on the last {@link #poll(long, List<NetworkSend>) poll()}
+ * call.
+ */
+ public List<Integer> disconnected();
+
+ /**
+ * The list of connections that completed their connection on the last {@link #poll(long, List<NetworkSend>) poll()}
+ * call.
+ */
+ public List<Integer> connected();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/Selector.java b/clients/src/main/java/kafka/common/network/Selector.java
new file mode 100644
index 0000000..f53060c
--- /dev/null
+++ b/clients/src/main/java/kafka/common/network/Selector.java
@@ -0,0 +1,349 @@
+package kafka.common.network;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.UnresolvedAddressException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import kafka.common.KafkaException;
+
+/**
+ * A selector interface for doing non-blocking multi-connection network I/O.
+ * <p>
+ * This class works with {@link NetworkSend} and {@link NetworkReceive} to transmit size-delimited network requests and
+ * responses.
+ * <p>
+ * A connection can be added to the selector associated with an integer id by doing
+ *
+ * <pre>
+ * selector.connect(42, new InetSocketAddress("google.com", server.port), 64000, 64000);
+ * </pre>
+ *
+ * The connect call does not block on the creation of the TCP connection, so the connect method only begins initiating
+ * the connection. The successful invocation of this method does not mean a valid connection has been established.
+ *
+ * Sending requests, receiving responses, processing connection completions, and disconnections on the existing
+ * connections are all done using the <code>poll()</code> call.
+ *
+ * <pre>
+ * List<NetworkRequest> requestsToSend = Arrays.asList(new NetworkRequest(0, myBytes), new NetworkRequest(1, myOtherBytes));
+ * selector.poll(TIMEOUT_MS, requestsToSend);
+ * </pre>
+ *
+ * The selector maintains several lists that are reset by each call to <code>poll()</code> which are available via
+ * various getters. These are reset by each call to <code>poll()</code>.
+ *
+ * This class is not thread safe!
+ */
+public class Selector implements Selectable {
+
+ private final java.nio.channels.Selector selector;
+ private final Map<Integer, SelectionKey> keys;
+ private final List<NetworkSend> completedSends;
+ private final List<NetworkReceive> completedReceives;
+ private final List<Integer> disconnected;
+ private final List<Integer> connected;
+
+ /**
+ * Create a new selector
+ */
+ public Selector() {
+ try {
+ this.selector = java.nio.channels.Selector.open();
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+ this.keys = new HashMap<Integer, SelectionKey>();
+ this.completedSends = new ArrayList<NetworkSend>();
+ this.completedReceives = new ArrayList<NetworkReceive>();
+ this.connected = new ArrayList<Integer>();
+ this.disconnected = new ArrayList<Integer>();
+ }
+
+ /**
+ * Begin connecting to the given address and add the connection to this selector associated with the given id
+ * number.
+ * <p>
+ * Note that this call only initiates the connection, which will be completed on a future {@link #poll(long, List)}
+ * call. Check {@link #connected()} to see which (if any) connections have completed after a given poll call.
+ * @param id The id for the new connection
+ * @param address The address to connect to
+ * @param sendBufferSize The send buffer for the new connection
+ * @param receiveBufferSize The receive buffer for the new connection
+ * @throws IllegalStateException if there is already a connection for that id
+ * @throws UnresolvedAddressException if DNS resolution fails on the hostname
+ */
+ @Override
+ public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
+ SocketChannel channel = SocketChannel.open();
+ channel.configureBlocking(false);
+ Socket socket = channel.socket();
+ socket.setKeepAlive(true);
+ socket.setSendBufferSize(sendBufferSize);
+ socket.setReceiveBufferSize(receiveBufferSize);
+ socket.setTcpNoDelay(true);
+ try {
+ channel.connect(address);
+ } catch (UnresolvedAddressException e) {
+ channel.close();
+ throw e;
+ }
+ SelectionKey key = channel.register(this.selector, SelectionKey.OP_CONNECT);
+ key.attach(new Transmissions(id));
+ if (this.keys.containsKey(key))
+ throw new IllegalStateException("There is already a connection for id " + id);
+ this.keys.put(id, key);
+ }
+
+ /**
+ * Disconnect any connections for the given id (if there are any). The disconnection is asynchronous and will not be
+ * processed until the next {@link #poll(long, List) poll()} call.
+ */
+ @Override
+ public void disconnect(int id) {
+ SelectionKey key = this.keys.get(id);
+ if (key != null)
+ key.cancel();
+ }
+
+ /**
+ * Interrupt the selector if it is blocked waiting to do I/O.
+ */
+ @Override
+ public void wakeup() {
+ this.selector.wakeup();
+ }
+
+ /**
+ * Close this selector and all associated connections
+ */
+ @Override
+ public void close() {
+ for (SelectionKey key : this.selector.keys()) {
+ try {
+ close(key);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ try {
+ this.selector.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing
+ * disconnections, initiating new sends, or making progress on in-progress sends or receives.
+ * <p>
+ * The provided network sends will be started.
+ *
+ * When this call is completed the user can check for completed sends, receives, connections or disconnects using
+ * {@link #completedSends()}, {@link #completedReceives()}, {@link #connected()}, {@link #disconnected()}. These
+ * lists will be cleared at the beginning of each {@link #poll(long, List)} call and repopulated by the call if any
+ * completed I/O.
+ *
+ * @param timeout The amount of time to wait, in milliseconds. If negative, wait indefinitely.
+ * @param sends The list of new sends to begin
+ *
+ * @throws IllegalStateException If a send is given for which we have no existing connection or for which there is
+ * already an in-progress send
+ */
+ @Override
+ public void poll(long timeout, List<NetworkSend> sends) throws IOException {
+ clear();
+
+ /* register for write interest on any new sends */
+ for (NetworkSend send : sends) {
+ SelectionKey key = keyForId(send.destination());
+ Transmissions transmissions = transmissions(key);
+ if (transmissions.hasSend())
+ throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
+ transmissions.send = send;
+ try {
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ } catch (CancelledKeyException e) {
+ close(key);
+ }
+ }
+
+ /* check ready keys */
+ int readyKeys = select(timeout);
+ if (readyKeys > 0) {
+ Set<SelectionKey> keys = this.selector.selectedKeys();
+ Iterator<SelectionKey> iter = keys.iterator();
+ while (iter.hasNext()) {
+ SelectionKey key = iter.next();
+ iter.remove();
+
+ Transmissions transmissions = transmissions(key);
+ SocketChannel channel = channel(key);
+ try {
+ /*
+ * complete any connections that have finished their handshake
+ */
+ if (key.isConnectable()) {
+ channel.finishConnect();
+ key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
+ this.connected.add(transmissions.id);
+ }
+
+ /* read from any connections that have readable data */
+ if (key.isReadable()) {
+ if (!transmissions.hasReceive())
+ transmissions.receive = new NetworkReceive(transmissions.id);
+ transmissions.receive.readFrom(channel);
+ if (transmissions.receive.complete()) {
+ transmissions.receive.payload().rewind();
+ this.completedReceives.add(transmissions.receive);
+ transmissions.clearReceive();
+ }
+ }
+
+ /*
+ * write to any sockets that have space in their buffer and for which we have data
+ */
+ if (key.isWritable()) {
+ transmissions.send.writeTo(channel);
+ if (transmissions.send.remaining() <= 0) {
+ this.completedSends.add(transmissions.send);
+ transmissions.clearSend();
+ key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+ }
+ }
+
+ /* cancel any defunct sockets */
+ if (!key.isValid())
+ close(key);
+ } catch (IOException e) {
+ e.printStackTrace();
+ close(key);
+ }
+ }
+ }
+ }
+
+ @Override
+ public List<NetworkSend> completedSends() {
+ return this.completedSends;
+ }
+
+ @Override
+ public List<NetworkReceive> completedReceives() {
+ return this.completedReceives;
+ }
+
+ @Override
+ public List<Integer> disconnected() {
+ return this.disconnected;
+ }
+
+ @Override
+ public List<Integer> connected() {
+ return this.connected;
+ }
+
+ /**
+ * Clear the results from the prior poll
+ */
+ private void clear() {
+ this.completedSends.clear();
+ this.completedReceives.clear();
+ this.connected.clear();
+ this.disconnected.clear();
+ }
+
+ /**
+ * Check for data, waiting up to the given timeout.
+ *
+ * @param ms Length of time to wait, in milliseconds. If negative, wait indefinitely.
+ * @return The number of keys ready
+ * @throws IOException
+ */
+ private int select(long ms) throws IOException {
+ if (ms == 0L)
+ return this.selector.selectNow();
+ else if (ms < 0L)
+ return this.selector.select();
+ else
+ return this.selector.select(ms);
+ }
+
+ /**
+ * Begin closing this connection
+ */
+ private void close(SelectionKey key) throws IOException {
+ SocketChannel channel = channel(key);
+ Transmissions trans = transmissions(key);
+ if (trans != null)
+ this.disconnected.add(trans.id);
+ key.attach(null);
+ key.cancel();
+ channel.socket().close();
+ channel.close();
+ }
+
+ /**
+ * Get the selection key associated with this numeric id
+ */
+ private SelectionKey keyForId(int id) {
+ SelectionKey key = this.keys.get(id);
+ if (key == null)
+ throw new IllegalStateException("Attempt to write to socket for which there is no open connection.");
+ return key;
+ }
+
+ /**
+ * Get the transmissions for the given connection
+ */
+ private Transmissions transmissions(SelectionKey key) {
+ return (Transmissions) key.attachment();
+ }
+
+ /**
+ * Get the socket channel associated with this selection key
+ */
+ private SocketChannel channel(SelectionKey key) {
+ return (SocketChannel) key.channel();
+ }
+
+ /**
+ * The id and in-progress send and receive associated with a connection
+ */
+ private static class Transmissions {
+ public int id;
+ public NetworkSend send;
+ public NetworkReceive receive;
+
+ public Transmissions(int id) {
+ this.id = id;
+ }
+
+ public boolean hasSend() {
+ return this.send != null;
+ }
+
+ public void clearSend() {
+ this.send = null;
+ }
+
+ public boolean hasReceive() {
+ return this.receive != null;
+ }
+
+ public void clearReceive() {
+ this.receive = null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/network/Send.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/Send.java b/clients/src/main/java/kafka/common/network/Send.java
new file mode 100644
index 0000000..e7ef68a
--- /dev/null
+++ b/clients/src/main/java/kafka/common/network/Send.java
@@ -0,0 +1,41 @@
+package kafka.common.network;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+
+/**
+ * This interface models the in-progress sending of data to a destination identified by an integer id.
+ */
+public interface Send {
+
+ /**
+ * The numeric id for the destination of this send
+ */
+ public int destination();
+
+ /**
+ * The number of bytes remaining to send
+ */
+ public int remaining();
+
+ /**
+ * Is this send complete?
+ */
+ public boolean complete();
+
+ /**
+ * An optional method to turn this send into an array of ByteBuffers if possible (otherwise returns null)
+ */
+ public ByteBuffer[] reify();
+
+ /**
+ * Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send
+ * to be completely written
+ * @param channel The channel to write to
+ * @return The number of bytes written
+ * @throws IOException If the write fails
+ */
+ public long writeTo(GatheringByteChannel channel) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/kafka/common/protocol/ApiKeys.java
new file mode 100644
index 0000000..1e2f8bb
--- /dev/null
+++ b/clients/src/main/java/kafka/common/protocol/ApiKeys.java
@@ -0,0 +1,35 @@
+package kafka.common.protocol;
+
+/**
+ * Identifiers for all the Kafka APIs
+ */
+public enum ApiKeys {
+ PRODUCE(0, "produce"),
+ FETCH(1, "fetch"),
+ LIST_OFFSETS(2, "list_offsets"),
+ METADATA(3, "metadata"),
+ LEADER_AND_ISR(4, "leader_and_isr"),
+ STOP_REPLICA(5, "stop_replica"),
+ OFFSET_COMMIT(6, "offset_commit"),
+ OFFSET_FETCH(7, "offset_fetch");
+
+ public static int MAX_API_KEY = 0;
+
+ static {
+ for (ApiKeys key : ApiKeys.values()) {
+ MAX_API_KEY = Math.max(MAX_API_KEY, key.id);
+ }
+ }
+
+ /** the perminant and immutable id of an API--this can't change ever */
+ public final short id;
+
+ /** an english description of the api--this is for debugging and can change */
+ public final String name;
+
+ private ApiKeys(int id, String name) {
+ this.id = (short) id;
+ this.name = name;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/Errors.java b/clients/src/main/java/kafka/common/protocol/Errors.java
new file mode 100644
index 0000000..fb1a3e5
--- /dev/null
+++ b/clients/src/main/java/kafka/common/protocol/Errors.java
@@ -0,0 +1,97 @@
+package kafka.common.protocol;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import kafka.common.errors.ApiException;
+import kafka.common.errors.CorruptMessageException;
+import kafka.common.errors.LeaderNotAvailableException;
+import kafka.common.errors.MessageTooLargeException;
+import kafka.common.errors.NetworkException;
+import kafka.common.errors.NotLeaderForPartitionException;
+import kafka.common.errors.OffsetMetadataTooLarge;
+import kafka.common.errors.OffsetOutOfRangeException;
+import kafka.common.errors.TimeoutException;
+import kafka.common.errors.UnknownServerException;
+import kafka.common.errors.UnknownTopicOrPartitionException;
+
+/**
+ * This class contains all the client-server errors--those errors that must be sent from the server to the client. These
+ * are thus part of the protocol. The names can be changed but the error code cannot.
+ *
+ * Do not add exceptions that occur only on the client or only on the server here.
+ */
+public enum Errors {
+ UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")),
+ NONE(0, null),
+ OFFSET_OUT_OF_RANGE(1,
+ new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
+ CORRUPT_MESSAGE(2,
+ new CorruptMessageException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
+ UNKNOWN_TOPIC_OR_PARTITION(3, new UnknownTopicOrPartitionException("This server does not host this topic-partition.")),
+ LEADER_NOT_AVAILABLE(5,
+ new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
+ NOT_LEADER_FOR_PARTITION(6, new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
+ REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")),
+ MESSAGE_TOO_LARGE(10,
+ new MessageTooLargeException("The request included a message larger than the max message size the server will accept.")),
+ OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
+ NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received."));
+
+ private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
+ private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();
+ static {
+ for (Errors error : Errors.values()) {
+ codeToError.put(error.code(), error);
+ if (error.exception != null)
+ classToError.put(error.exception.getClass(), error);
+ }
+
+ }
+
+ private final short code;
+ private final ApiException exception;
+
+ private Errors(int code, ApiException exception) {
+ this.code = (short) code;
+ this.exception = exception;
+ }
+
+ /**
+ * An instance of the exception
+ */
+ public ApiException exception() {
+ return this.exception;
+ }
+
+ /**
+ * The error code for the exception
+ */
+ public short code() {
+ return this.code;
+ }
+
+ /**
+ * Throw the exception corresponding to this error if there is one
+ */
+ public void maybeThrow() {
+ if (exception != null)
+ throw this.exception;
+ }
+
+ /**
+ * Throw the exception if there is one
+ */
+ public static Errors forCode(short code) {
+ Errors error = codeToError.get(code);
+ return error == null ? UNKNOWN : error;
+ }
+
+ /**
+ * Return the error instance associated with this exception (or UKNOWN if there is none)
+ */
+ public static Errors forException(Throwable t) {
+ Errors error = classToError.get(t.getClass());
+ return error == null ? UNKNOWN : error;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/protocol/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/kafka/common/protocol/ProtoUtils.java
new file mode 100644
index 0000000..83dad53
--- /dev/null
+++ b/clients/src/main/java/kafka/common/protocol/ProtoUtils.java
@@ -0,0 +1,95 @@
+package kafka.common.protocol;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import kafka.common.Cluster;
+import kafka.common.Node;
+import kafka.common.PartitionInfo;
+import kafka.common.protocol.types.Schema;
+import kafka.common.protocol.types.Struct;
+
+public class ProtoUtils {
+
+ private static Schema schemaFor(Schema[][] schemas, int apiKey, int version) {
+ if (apiKey < 0 || apiKey > schemas.length)
+ throw new IllegalArgumentException("Invalid api key: " + apiKey);
+ Schema[] versions = schemas[apiKey];
+ if (version < 0 || version > versions.length)
+ throw new IllegalArgumentException("Invalid version for API key " + apiKey + ": " + version);
+ return versions[version];
+ }
+
+ public static short latestVersion(int apiKey) {
+ if (apiKey < 0 || apiKey >= Protocol.CURR_VERSION.length)
+ throw new IllegalArgumentException("Invalid api key: " + apiKey);
+ return Protocol.CURR_VERSION[apiKey];
+ }
+
+ public static Schema requestSchema(int apiKey, int version) {
+ return schemaFor(Protocol.REQUESTS, apiKey, version);
+ }
+
+ public static Schema currentRequestSchema(int apiKey) {
+ return requestSchema(apiKey, latestVersion(apiKey));
+ }
+
+ public static Schema responseSchema(int apiKey, int version) {
+ return schemaFor(Protocol.RESPONSES, apiKey, version);
+ }
+
+ public static Schema currentResponseSchema(int apiKey) {
+ return schemaFor(Protocol.RESPONSES, apiKey, latestVersion(apiKey));
+ }
+
+ public static Struct parseRequest(int apiKey, int version, ByteBuffer buffer) {
+ return (Struct) requestSchema(apiKey, version).read(buffer);
+ }
+
+ public static Struct parseResponse(int apiKey, ByteBuffer buffer) {
+ return (Struct) currentResponseSchema(apiKey).read(buffer);
+ }
+
+ public static Cluster parseMetadataResponse(Struct response) {
+ List<Node> brokers = new ArrayList<Node>();
+ Object[] brokerStructs = (Object[]) response.get("brokers");
+ for (int i = 0; i < brokerStructs.length; i++) {
+ Struct broker = (Struct) brokerStructs[i];
+ int nodeId = (Integer) broker.get("node_id");
+ String host = (String) broker.get("host");
+ int port = (Integer) broker.get("port");
+ brokers.add(new Node(nodeId, host, port));
+ }
+ List<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
+ Object[] topicInfos = (Object[]) response.get("topic_metadata");
+ for (int i = 0; i < topicInfos.length; i++) {
+ Struct topicInfo = (Struct) topicInfos[i];
+ short topicError = topicInfo.getShort("topic_error_code");
+ if (topicError == Errors.NONE.code()) {
+ String topic = topicInfo.getString("topic");
+ Object[] partitionInfos = (Object[]) topicInfo.get("partition_metadata");
+ for (int j = 0; j < partitionInfos.length; j++) {
+ Struct partitionInfo = (Struct) partitionInfos[j];
+ short partError = partitionInfo.getShort("partition_error_code");
+ if (partError == Errors.NONE.code()) {
+ int partition = partitionInfo.getInt("partition_id");
+ int leader = partitionInfo.getInt("leader");
+ int[] replicas = intArray((Object[]) partitionInfo.get("replicas"));
+ int[] isr = intArray((Object[]) partitionInfo.get("isr"));
+ partitions.add(new PartitionInfo(topic, partition, leader, replicas, isr));
+ }
+ }
+ }
+ }
+ return new Cluster(brokers, partitions);
+ }
+
+ private static int[] intArray(Object[] ints) {
+ int[] copy = new int[ints.length];
+ for (int i = 0; i < ints.length; i++)
+ copy[i] = (Integer) ints[i];
+ return copy;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/Protocol.java b/clients/src/main/java/kafka/common/protocol/Protocol.java
new file mode 100644
index 0000000..e191d6a
--- /dev/null
+++ b/clients/src/main/java/kafka/common/protocol/Protocol.java
@@ -0,0 +1,130 @@
+package kafka.common.protocol;
+
+import static kafka.common.protocol.types.Type.BYTES;
+import static kafka.common.protocol.types.Type.INT16;
+import static kafka.common.protocol.types.Type.INT32;
+import static kafka.common.protocol.types.Type.INT64;
+import static kafka.common.protocol.types.Type.STRING;
+import kafka.common.protocol.types.ArrayOf;
+import kafka.common.protocol.types.Field;
+import kafka.common.protocol.types.Schema;
+
+public class Protocol {
+
+ public static Schema REQUEST_HEADER = new Schema(new Field("api_key", INT16, "The id of the request type."),
+ new Field("api_version", INT16, "The version of the API."),
+ new Field("correlation_id",
+ INT32,
+ "A user-supplied integer value that will be passed back with the response"),
+ new Field("client_id",
+ STRING,
+ "A user specified identifier for the client making the request."));
+
+ public static Schema RESPONSE_HEADER = new Schema(new Field("correlation_id",
+ INT32,
+ "The user-supplied value passed in with the request"));
+
+ /* Metadata api */
+
+ public static Schema METADATA_REQUEST_V0 = new Schema(new Field("topics",
+ new ArrayOf(STRING),
+ "An array of topics to fetch metadata for. If no topics are specified fetch metadtata for all topics."));
+
+ public static Schema BROKER = new Schema(new Field("node_id", INT32, "The broker id."),
+ new Field("host", STRING, "The hostname of the broker."),
+ new Field("port", INT32, "The port on which the broker accepts requests."));
+
+ public static Schema PARTITION_METADATA_V0 = new Schema(new Field("partition_error_code",
+ INT16,
+ "The error code for the partition, if any."),
+ new Field("partition_id", INT32, "The id of the partition."),
+ new Field("leader",
+ INT32,
+ "The id of the broker acting as leader for this partition."),
+ new Field("replicas",
+ new ArrayOf(INT32),
+ "The set of all nodes that host this partition."),
+ new Field("isr",
+ new ArrayOf(INT32),
+ "The set of nodes that are in sync with the leader for this partition."));
+
+ public static Schema TOPIC_METADATA_V0 = new Schema(new Field("topic_error_code", INT16, "The error code for the given topic."),
+ new Field("topic", STRING, "The name of the topic"),
+ new Field("partition_metadata",
+ new ArrayOf(PARTITION_METADATA_V0),
+ "Metadata for each partition of the topic."));
+
+ public static Schema METADATA_RESPONSE_V0 = new Schema(new Field("brokers",
+ new ArrayOf(BROKER),
+ "Host and port information for all brokers."),
+ new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V0)));
+
+ public static Schema[] METADATA_REQUEST = new Schema[] { METADATA_REQUEST_V0 };
+ public static Schema[] METADATA_RESPONSE = new Schema[] { METADATA_RESPONSE_V0 };
+
+ /* Produce api */
+
+ public static Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", STRING),
+ new Field("data", new ArrayOf(new Schema(new Field("partition", INT32),
+ new Field("message_set", BYTES)))));
+
+ public static Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks",
+ INT16,
+ "The number of nodes that should replicate the produce before returning. -1 indicates the full ISR."),
+ new Field("timeout", INT32, "The time to await a response in ms."),
+ new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
+
+ public static Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses",
+ new ArrayOf(new Schema(new Field("topic", STRING),
+ new Field("partition_responses",
+ new ArrayOf(new Schema(new Field("partition",
+ INT32),
+ new Field("error_code",
+ INT16),
+ new Field("base_offset",
+ INT64))))))));
+
+ public static Schema[] PRODUCE_REQUEST = new Schema[] { PRODUCE_REQUEST_V0 };
+ public static Schema[] PRODUCE_RESPONSE = new Schema[] { PRODUCE_RESPONSE_V0 };
+
+ /* an array of all requests and responses with all schema versions */
+ public static Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
+ public static Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
+
+ /* the latest version of each api */
+ public static short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1];
+
+ static {
+ REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST;
+ REQUESTS[ApiKeys.FETCH.id] = new Schema[] {};
+ REQUESTS[ApiKeys.LIST_OFFSETS.id] = new Schema[] {};
+ REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST;
+ REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
+ REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
+ REQUESTS[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {};
+ REQUESTS[ApiKeys.OFFSET_FETCH.id] = new Schema[] {};
+
+ RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
+ RESPONSES[ApiKeys.FETCH.id] = new Schema[] {};
+ RESPONSES[ApiKeys.LIST_OFFSETS.id] = new Schema[] {};
+ RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE;
+ RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
+ RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
+ RESPONSES[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {};
+ RESPONSES[ApiKeys.OFFSET_FETCH.id] = new Schema[] {};
+
+ /* set the maximum version of each api */
+ for (ApiKeys api : ApiKeys.values())
+ CURR_VERSION[api.id] = (short) (REQUESTS[api.id].length - 1);
+
+ /* sanity check that we have the same number of request and response versions for each api */
+ for (ApiKeys api : ApiKeys.values())
+ if (REQUESTS[api.id].length != RESPONSES[api.id].length)
+ throw new IllegalStateException(REQUESTS[api.id].length + " request versions for api "
+ + api.name
+ + " but "
+ + RESPONSES[api.id].length
+ + " response versions.");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/protocol/types/ArrayOf.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/ArrayOf.java b/clients/src/main/java/kafka/common/protocol/types/ArrayOf.java
new file mode 100644
index 0000000..5daf95b
--- /dev/null
+++ b/clients/src/main/java/kafka/common/protocol/types/ArrayOf.java
@@ -0,0 +1,63 @@
+package kafka.common.protocol.types;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Represents a type for an array of a particular type
+ */
+public class ArrayOf extends Type {
+
+ private final Type type;
+
+ public ArrayOf(Type type) {
+ this.type = type;
+ }
+
+ @Override
+ public void write(ByteBuffer buffer, Object o) {
+ Object[] objs = (Object[]) o;
+ int size = objs.length;
+ buffer.putInt(size);
+ for (int i = 0; i < size; i++)
+ type.write(buffer, objs[i]);
+ }
+
+ @Override
+ public Object read(ByteBuffer buffer) {
+ int size = buffer.getInt();
+ Object[] objs = new Object[size];
+ for (int i = 0; i < size; i++)
+ objs[i] = type.read(buffer);
+ return objs;
+ }
+
+ @Override
+ public int sizeOf(Object o) {
+ Object[] objs = (Object[]) o;
+ int size = 4;
+ for (int i = 0; i < objs.length; i++)
+ size += type.sizeOf(objs[i]);
+ return size;
+ }
+
+ public Type type() {
+ return type;
+ }
+
+ @Override
+ public String toString() {
+ return "ARRAY(" + type + ")";
+ }
+
+ @Override
+ public Object[] validate(Object item) {
+ try {
+ Object[] array = (Object[]) item;
+ for (int i = 0; i < array.length; i++)
+ type.validate(array[i]);
+ return array;
+ } catch (ClassCastException e) {
+ throw new SchemaException("Not an Object[].");
+ }
+ }
+}